Copilot commented on code in PR #17811:
URL: https://github.com/apache/pinot/pull/17811#discussion_r2885302604


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -380,6 +379,32 @@ public CopyTableResponse copyTable(
     }
   }
 
+  @VisibleForTesting
+  static List<StreamMetadata> getStreamMetadataList(List<StreamConfig> 
streamConfigs,
+      WatermarkInductionResult watermarkInductionResult) {
+    Map<Integer, List<PartitionGroupMetadata>> 
partitionGroupMetadataByStreamConfigIndex = new HashMap<>();
+    for (WatermarkInductionResult.Watermark watermark : 
watermarkInductionResult.getWatermarks()) {
+      int streamConfigIndex =
+          
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(watermark.getPartitionGroupId());
+      
partitionGroupMetadataByStreamConfigIndex.computeIfAbsent(streamConfigIndex, 
ignored -> new ArrayList<>()).add(
+          new PartitionGroupMetadata(watermark.getPartitionGroupId(), new 
LongMsgOffset(watermark.getOffset()),
+              watermark.getSequenceNumber()));
+    }
+
+    List<StreamMetadata> streamMetadataList = new 
ArrayList<>(partitionGroupMetadataByStreamConfigIndex.size());
+    for (Map.Entry<Integer, List<PartitionGroupMetadata>> entry
+        : partitionGroupMetadataByStreamConfigIndex.entrySet()) {
+      int streamConfigIndex = entry.getKey();
+      List<PartitionGroupMetadata> partitionGroupMetadataList = 
entry.getValue();
+      // In the copy table flow, watermarks cover all consumed partitions, so 
the list size
+      // equals the partition count. The actual stream partition count is not 
available without
+      // a live stream connection, which is unnecessary here.
+      streamMetadataList.add(new 
StreamMetadata(streamConfigs.get(streamConfigIndex), streamConfigIndex,
+          partitionGroupMetadataList.size(), partitionGroupMetadataList));
+    }
+    return streamMetadataList;

Review Comment:
   `getStreamMetadataList(...)` builds `streamMetadataList` by iterating over a 
`HashMap`’s `entrySet()`, which makes the returned list order 
non-deterministic. Downstream code (e.g., segment setup / ideal-state updates) 
is easier to reason about and more reproducible if the list is ordered by 
`streamConfigIndex` (or follows the `streamConfigs` order). Consider iterating 
indices in order or sorting the entries before constructing the list.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -122,25 +137,28 @@ private Boolean fetchMultipleStreams()
               .collect(Collectors.toList());
       try (StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(
           StreamConsumerFactory.getUniqueClientId(clientId))) {
-        _newPartitionGroupMetadataList.addAll(
+        int partitionCount = 
streamMetadataProvider.fetchPartitionCount(/*timeoutMillis=*/15000);
+        List<PartitionGroupMetadata> partitionGroupMetadataList =
             streamMetadataProvider.computePartitionGroupMetadata(clientId,
                     streamConfig, topicPartitionGroupConsumptionStatusList, 
/*maxWaitTimeMs=*/15000,
                     _forceGetOffsetFromStream)

Review Comment:
   Same issue as the single-stream path: calling `fetchPartitionCount()` before 
`computePartitionGroupMetadata()` can cause duplicate partition-count fetches 
for implementations that use the default `computePartitionGroupMetadata()` 
(notably Kafka). This doubles metadata calls per stream during ideal-state 
updates.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -380,6 +379,32 @@ public CopyTableResponse copyTable(
     }
   }
 
+  @VisibleForTesting
+  static List<StreamMetadata> getStreamMetadataList(List<StreamConfig> 
streamConfigs,
+      WatermarkInductionResult watermarkInductionResult) {
+    Map<Integer, List<PartitionGroupMetadata>> 
partitionGroupMetadataByStreamConfigIndex = new HashMap<>();
+    for (WatermarkInductionResult.Watermark watermark : 
watermarkInductionResult.getWatermarks()) {
+      int streamConfigIndex =
+          
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(watermark.getPartitionGroupId());
+      
partitionGroupMetadataByStreamConfigIndex.computeIfAbsent(streamConfigIndex, 
ignored -> new ArrayList<>()).add(
+          new PartitionGroupMetadata(watermark.getPartitionGroupId(), new 
LongMsgOffset(watermark.getOffset()),
+              watermark.getSequenceNumber()));
+    }
+
+    List<StreamMetadata> streamMetadataList = new 
ArrayList<>(partitionGroupMetadataByStreamConfigIndex.size());
+    for (Map.Entry<Integer, List<PartitionGroupMetadata>> entry
+        : partitionGroupMetadataByStreamConfigIndex.entrySet()) {
+      int streamConfigIndex = entry.getKey();
+      List<PartitionGroupMetadata> partitionGroupMetadataList = 
entry.getValue();
+      // In the copy table flow, watermarks cover all consumed partitions, so 
the list size
+      // equals the partition count. The actual stream partition count is not 
available without
+      // a live stream connection, which is unnecessary here.
+      streamMetadataList.add(new 
StreamMetadata(streamConfigs.get(streamConfigIndex), streamConfigIndex,
+          partitionGroupMetadataList.size(), partitionGroupMetadataList));

Review Comment:
   `streamConfigs.get(streamConfigIndex)` can throw `IndexOutOfBoundsException` 
if the watermark’s Pinot partition id encodes a stream index that isn’t present 
in the target table config (e.g., config drift between source watermarks and 
the copied realtime config). This currently bubbles up as a 500. Consider 
validating `streamConfigIndex` against `streamConfigs.size()` and failing with 
a clearer, user-actionable error (e.g., bad request) when it’s out of range.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to