Copilot commented on code in PR #17811:
URL: https://github.com/apache/pinot/pull/17811#discussion_r2885302604
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -380,6 +379,32 @@ public CopyTableResponse copyTable(
}
}
+ @VisibleForTesting
+ static List<StreamMetadata> getStreamMetadataList(List<StreamConfig>
streamConfigs,
+ WatermarkInductionResult watermarkInductionResult) {
+ Map<Integer, List<PartitionGroupMetadata>>
partitionGroupMetadataByStreamConfigIndex = new HashMap<>();
+ for (WatermarkInductionResult.Watermark watermark :
watermarkInductionResult.getWatermarks()) {
+ int streamConfigIndex =
+
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(watermark.getPartitionGroupId());
+
partitionGroupMetadataByStreamConfigIndex.computeIfAbsent(streamConfigIndex,
ignored -> new ArrayList<>()).add(
+ new PartitionGroupMetadata(watermark.getPartitionGroupId(), new
LongMsgOffset(watermark.getOffset()),
+ watermark.getSequenceNumber()));
+ }
+
+ List<StreamMetadata> streamMetadataList = new
ArrayList<>(partitionGroupMetadataByStreamConfigIndex.size());
+ for (Map.Entry<Integer, List<PartitionGroupMetadata>> entry
+ : partitionGroupMetadataByStreamConfigIndex.entrySet()) {
+ int streamConfigIndex = entry.getKey();
+ List<PartitionGroupMetadata> partitionGroupMetadataList =
entry.getValue();
+ // In the copy table flow, watermarks cover all consumed partitions, so
the list size
+ // equals the partition count. The actual stream partition count is not
available without
+ // a live stream connection, which is unnecessary here.
+ streamMetadataList.add(new
StreamMetadata(streamConfigs.get(streamConfigIndex), streamConfigIndex,
+ partitionGroupMetadataList.size(), partitionGroupMetadataList));
+ }
+ return streamMetadataList;
Review Comment:
`getStreamMetadataList(...)` builds `streamMetadataList` by iterating over a
`HashMap`’s `entrySet()`, which makes the returned list order
non-deterministic. Downstream code (e.g., segment setup / ideal-state updates)
is easier to reason about and more reproducible if the list is ordered by
`streamConfigIndex` (or follows the `streamConfigs` order). Consider iterating
indices in order or sorting the entries before constructing the list.
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java:
##########
@@ -122,25 +137,28 @@ private Boolean fetchMultipleStreams()
.collect(Collectors.toList());
try (StreamMetadataProvider streamMetadataProvider =
streamConsumerFactory.createStreamMetadataProvider(
StreamConsumerFactory.getUniqueClientId(clientId))) {
- _newPartitionGroupMetadataList.addAll(
+ int partitionCount =
streamMetadataProvider.fetchPartitionCount(/*timeoutMillis=*/15000);
+ List<PartitionGroupMetadata> partitionGroupMetadataList =
streamMetadataProvider.computePartitionGroupMetadata(clientId,
streamConfig, topicPartitionGroupConsumptionStatusList,
/*maxWaitTimeMs=*/15000,
_forceGetOffsetFromStream)
Review Comment:
Same issue as the single-stream path: calling `fetchPartitionCount()` before
`computePartitionGroupMetadata()` can cause duplicate partition-count fetches
for implementations that use the default `computePartitionGroupMetadata()`
(notably Kafka). This doubles metadata calls per stream during ideal-state
updates.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -380,6 +379,32 @@ public CopyTableResponse copyTable(
}
}
+ @VisibleForTesting
+ static List<StreamMetadata> getStreamMetadataList(List<StreamConfig>
streamConfigs,
+ WatermarkInductionResult watermarkInductionResult) {
+ Map<Integer, List<PartitionGroupMetadata>>
partitionGroupMetadataByStreamConfigIndex = new HashMap<>();
+ for (WatermarkInductionResult.Watermark watermark :
watermarkInductionResult.getWatermarks()) {
+ int streamConfigIndex =
+
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(watermark.getPartitionGroupId());
+
partitionGroupMetadataByStreamConfigIndex.computeIfAbsent(streamConfigIndex,
ignored -> new ArrayList<>()).add(
+ new PartitionGroupMetadata(watermark.getPartitionGroupId(), new
LongMsgOffset(watermark.getOffset()),
+ watermark.getSequenceNumber()));
+ }
+
+ List<StreamMetadata> streamMetadataList = new
ArrayList<>(partitionGroupMetadataByStreamConfigIndex.size());
+ for (Map.Entry<Integer, List<PartitionGroupMetadata>> entry
+ : partitionGroupMetadataByStreamConfigIndex.entrySet()) {
+ int streamConfigIndex = entry.getKey();
+ List<PartitionGroupMetadata> partitionGroupMetadataList =
entry.getValue();
+ // In the copy table flow, watermarks cover all consumed partitions, so
the list size
+ // equals the partition count. The actual stream partition count is not
available without
+ // a live stream connection, which is unnecessary here.
+ streamMetadataList.add(new
StreamMetadata(streamConfigs.get(streamConfigIndex), streamConfigIndex,
+ partitionGroupMetadataList.size(), partitionGroupMetadataList));
Review Comment:
`streamConfigs.get(streamConfigIndex)` can throw `IndexOutOfBoundsException`
if the watermark’s Pinot partition id encodes a stream index that isn’t present
in the target table config (e.g., config drift between source watermarks and
the copied realtime config). This currently bubbles up as a 500. Consider
validating `streamConfigIndex` against `streamConfigs.size()` and failing with
a clearer, user-actionable error (e.g., bad request) when it’s out of range.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]