Copilot commented on code in PR #17587:
URL: https://github.com/apache/pinot/pull/17587#discussion_r2742499282
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java:
##########
@@ -96,6 +139,55 @@ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
}
}
+ @Override
+ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String
clientId, StreamConfig streamConfig,
+ List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses,
int timeoutMillis)
+ throws IOException, java.util.concurrent.TimeoutException {
+ if (!_partialPartitions) {
+ return
StreamMetadataProvider.super.computePartitionGroupMetadata(clientId,
streamConfig,
+ partitionGroupConsumptionStatuses, timeoutMillis);
+ }
+ List<Integer> subset = _partitionIdSubset;
+ Set<Integer> topicIds = fetchPartitionIds(timeoutMillis);
+ Map<Integer, StreamPartitionMsgOffset> consumptionByPartition = new
HashMap<>();
+ for (PartitionGroupConsumptionStatus s :
partitionGroupConsumptionStatuses) {
+ consumptionByPartition.put(s.getStreamPartitionGroupId(),
s.getEndOffset());
+ }
+ StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ List<PartitionGroupMetadata> result = new ArrayList<>(subset.size());
+ for (Integer partitionId : subset) {
+ if (!topicIds.contains(partitionId)) {
+ LOGGER.warn(
+ "Configured partition id {} does not exist in topic {} when
computing partition group metadata. "
+ + "This indicates that topic partitions may have changed
between validation and metadata "
+ + "computation. Skipping this partition. Current topic
partitions: {}",
+ partitionId, _topic, topicIds);
Review Comment:
The logging message splits arguments across multiple lines in an
inconsistent order. The message template references 3 placeholders but lists
topicIds last, while the previous validation error at line 101 lists it third.
For consistency, move 'Current topic partitions: {}' to position 3 in both the
message and arguments, matching the error message pattern.
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java:
##########
@@ -96,6 +139,55 @@ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
}
}
+ @Override
+ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String
clientId, StreamConfig streamConfig,
+ List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses,
int timeoutMillis)
+ throws IOException, java.util.concurrent.TimeoutException {
+ if (!_partialPartitions) {
+ return
StreamMetadataProvider.super.computePartitionGroupMetadata(clientId,
streamConfig,
+ partitionGroupConsumptionStatuses, timeoutMillis);
+ }
+ List<Integer> subset = _partitionIdSubset;
+ Set<Integer> topicIds = fetchPartitionIds(timeoutMillis);
+ Map<Integer, StreamPartitionMsgOffset> consumptionByPartition = new
HashMap<>();
+ for (PartitionGroupConsumptionStatus s :
partitionGroupConsumptionStatuses) {
+ consumptionByPartition.put(s.getStreamPartitionGroupId(),
s.getEndOffset());
+ }
+ StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ List<PartitionGroupMetadata> result = new ArrayList<>(subset.size());
+ for (Integer partitionId : subset) {
+ if (!topicIds.contains(partitionId)) {
+ LOGGER.warn(
+ "Configured partition id {} does not exist in topic {} when
computing partition group metadata. "
+ + "Current topic partitions: {}. This indicates that topic
partitions may have changed between "
+ + "validation and metadata computation. Skipping this
partition.",
Review Comment:
The warn message structure differs from the Kafka 2.0 implementation. The
Kafka 2.0 version (line 162-163) places the explanatory text about partition
changes before 'Current topic partitions', while this version places it after.
Standardize the message order between Kafka 2.0 and 3.0 implementations for
consistency.
```suggestion
+ "This indicates that topic partitions may have changed
between validation and metadata computation. "
+ "Current topic partitions: {}. Skipping this partition.",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]