Copilot commented on code in PR #17587:
URL: https://github.com/apache/pinot/pull/17587#discussion_r2741686543
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -390,24 +416,29 @@ private void
replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
}
if (numPartitions == 1) {
+ int partitionId = (getPartitionIds() != null && getPartitionIds().size()
== 1)
+ ? getPartitionIds().get(0) : 0;
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
List<String> instancesInReplicaGroup =
replicaGroupIdToInstancesMap.get(replicaGroupId);
if (replicaGroupId < existingNumReplicaGroups) {
- List<String> existingInstances =
_existingInstancePartitions.getInstances(0, replicaGroupId);
+ List<String> existingInstances =
getExistingInstancesOrEmpty(partitionId, replicaGroupId);
LinkedHashSet<String> candidateInstances = new
LinkedHashSet<>(instancesInReplicaGroup);
List<String> instances =
selectInstancesWithMinimumMovement(numInstancesPerReplicaGroup,
candidateInstances, existingInstances);
LOGGER.info(
- "Selecting instances: {} for replica-group: {}, partition: 0 for
table: {}, existing instances: {}",
- instances, replicaGroupId, _tableNameWithType,
existingInstances);
- instancePartitions.setInstances(0, replicaGroupId, instances);
+ "Selecting instances: {} for replica-group: {}, partition: {}
for table: {}, existing instances: {}",
+ instances, replicaGroupId, partitionId, _tableNameWithType,
existingInstances);
+ instancePartitions.setInstances(partitionId, replicaGroupId,
instances);
} else {
- LOGGER.info("Selecting instances: {} for replica-group: {},
partition: 0 for table: {}, "
- + "there is no existing instances", instancesInReplicaGroup,
replicaGroupId, _tableNameWithType);
- instancePartitions.setInstances(0, replicaGroupId,
instancesInReplicaGroup);
+ LOGGER.info("Selecting instances: {} for replica-group: {},
partition: {} for table: {}, "
+ + "there is no existing instances", instancesInReplicaGroup,
replicaGroupId, partitionId,
+ _tableNameWithType);
+ instancePartitions.setInstances(partitionId, replicaGroupId,
instancesInReplicaGroup);
}
}
} else {
+ List<Integer> partitionIds = getPartitionIds() != null ?
getPartitionIds()
+ : IntStream.range(0,
numPartitions).boxed().collect(Collectors.toList());
Review Comment:
The method getPartitionIds() is called twice. Cache the result in a local
variable to avoid the redundant method call.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -196,10 +210,12 @@ private void replicaGroupBasedSimple(Map<Integer,
List<InstanceConfig>> poolToIn
// [i0, i1, i2, i3, i4]
// p0 p0 p0 p1 p1
// p1 p2 p2 p2
+ List<Integer> partitionIds = getPartitionIds() != null ? getPartitionIds()
+ : IntStream.range(0,
numPartitions).boxed().collect(Collectors.toList());
Review Comment:
The method getPartitionIds() is called twice. Cache the result in a local
variable to avoid the redundant method call.
```suggestion
List<Integer> partitionIds = getPartitionIds();
if (partitionIds == null) {
partitionIds = IntStream.range(0,
numPartitions).boxed().collect(Collectors.toList());
}
```
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java:
##########
@@ -96,6 +139,55 @@ public Set<Integer> fetchPartitionIds(long timeoutMillis) {
}
}
+ @Override
+ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String
clientId, StreamConfig streamConfig,
+ List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatuses,
int timeoutMillis)
+ throws IOException, java.util.concurrent.TimeoutException {
+ if (!_partialPartitions) {
+ return
StreamMetadataProvider.super.computePartitionGroupMetadata(clientId,
streamConfig,
+ partitionGroupConsumptionStatuses, timeoutMillis);
+ }
+ List<Integer> subset = _partitionIdSubset;
+ Set<Integer> topicIds = fetchPartitionIds(timeoutMillis);
+ Map<Integer, StreamPartitionMsgOffset> consumptionByPartition = new
HashMap<>();
+ for (PartitionGroupConsumptionStatus s :
partitionGroupConsumptionStatuses) {
+ consumptionByPartition.put(s.getStreamPartitionGroupId(),
s.getEndOffset());
+ }
+ StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ List<PartitionGroupMetadata> result = new ArrayList<>(subset.size());
+ for (Integer partitionId : subset) {
+ if (!topicIds.contains(partitionId)) {
+ LOGGER.warn(
+ "Configured partition id {} does not exist in topic {} when
computing partition group metadata. "
+ + "Topic partitions: {}. This indicates that topic partitions
may have changed between "
Review Comment:
The warning message spans multiple lines with inconsistent formatting. The
second line is missing 'Current' prefix compared to the kafka-2.0
implementation. Consider using a consistent format between both implementations
or using a format string placeholder pattern for better maintainability.
```suggestion
+ "Current topic partitions: {}. This indicates that topic
partitions may have changed between "
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -390,24 +416,29 @@ private void
replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
}
if (numPartitions == 1) {
+ int partitionId = (getPartitionIds() != null && getPartitionIds().size()
== 1)
+ ? getPartitionIds().get(0) : 0;
Review Comment:
The method getPartitionIds() is called three times. Cache the result in a
local variable to avoid redundant method calls.
```suggestion
List<Integer> partitionIds = getPartitionIds();
int partitionId = (partitionIds != null && partitionIds.size() == 1)
? partitionIds.get(0) : 0;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]