Copilot commented on code in PR #17811:
URL: https://github.com/apache/pinot/pull/17811#discussion_r2882669162


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -350,17 +353,23 @@ public CopyTableResponse copyTable(
         return new CopyTableResponse("success", "Dry run", schema, 
realtimeTableConfig, watermarkInductionResult);
       }
 
-      List<Pair<PartitionGroupMetadata, Integer>> partitionGroupInfos = 
watermarkInductionResult.getWatermarks()
-          .stream()
-          .map(watermark -> Pair.of(
-              new PartitionGroupMetadata(watermark.getPartitionGroupId(), new 
LongMsgOffset(watermark.getOffset())),
-              watermark.getSequenceNumber()))
+      List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(realtimeTableConfig);
+      Map<Integer, List<PartitionGroupMetadata>> groupedByStream = 
watermarkInductionResult.getWatermarks().stream()
+          .collect(Collectors.groupingBy(
+              w -> 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(w.getPartitionGroupId()),
+              Collectors.mapping(
+                  w -> new PartitionGroupMetadata(w.getPartitionGroupId(),
+                      new LongMsgOffset(w.getOffset()), w.getSequenceNumber()),
+                  Collectors.toList())));
+      List<StreamMetadata> streamMetadataList = 
groupedByStream.entrySet().stream()
+          .map(e -> new StreamMetadata(streamConfigs.get(e.getKey()), 
e.getKey(),
+              e.getValue().size(), e.getValue()))

Review Comment:
   `StreamMetadata.getPartitionCount()` is documented as the total number of 
partitions for the stream (which may exceed the size of the partition-group 
metadata list). Here `StreamMetadata` is constructed with `partitionCount = 
e.getValue().size()`, which is just the number of watermarks returned and can 
undercount if some partitions have no watermark/segment. Please populate 
`partitionCount` with the actual stream partition count (or use a value whose 
semantics match the `StreamMetadata` contract).
   ```suggestion
             .map(e -> {
               StreamConfig streamConfig = streamConfigs.get(e.getKey());
               Integer partitionCountObj = streamConfig.getNumPartitions();
               int partitionCount = (partitionCountObj != null) ? 
partitionCountObj : e.getValue().size();
               return new StreamMetadata(streamConfig, e.getKey(), 
partitionCount, e.getValue());
             })
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to