Copilot commented on code in PR #17587:
URL: https://github.com/apache/pinot/pull/17587#discussion_r2732861641
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/README.md:
##########
@@ -40,3 +40,9 @@ Below is a sample `streamConfigs` used to create a real-time
table with Kafka co
"stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
}
```
+
Review Comment:
Corrected incorrect code fence language identifier from '$xslt' to 'json'.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -433,26 +454,33 @@ private void
replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
partitionIdToInstanceSetMap.add(instanceSet);
// Keep the existing instances that are still alive
- if (partitionId < existingNumPartitions) {
+ if (idx < existingNumPartitions) {
List<String> existingInstances =
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
- partitionIdToExistingInstancesMap.add(existingInstances);
- int numInstancesToCheck = Math.min(numInstancesPerPartition,
existingInstances.size());
- for (int i = 0; i < numInstancesToCheck; i++) {
- String existingInstance = existingInstances.get(i);
- Integer numPartitionsOnInstance =
instanceToNumPartitionsMap.get(existingInstance);
- if (numPartitionsOnInstance != null && numPartitionsOnInstance
< maxNumPartitionsPerInstance) {
- instances.set(i, existingInstance);
- instanceSet.add(existingInstance);
- instanceToNumPartitionsMap.put(existingInstance,
numPartitionsOnInstance + 1);
+ if (existingInstances != null) {
+ partitionIdToExistingInstancesMap.add(existingInstances);
Review Comment:
Multiple locations in this method now check if existingInstances is null and
add empty lists. Consider extracting this logic into a helper method to reduce
duplication and improve readability.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -160,12 +160,14 @@ protected List<String> assignConsumingSegment(int
segmentPartitionId, InstancePa
instancesAssigned.add(instances.get(segmentPartitionId %
instances.size()));
}
} else {
- // Explicit partition:
- // Assign segment to the first instance within the partition.
-
+ // Explicit partition: instance partitions are keyed by stream
partition id (supports non-contiguous subset).
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
- int partitionId = segmentPartitionId % numPartitions;
- instancesAssigned.add(instancePartitions.getInstances(partitionId,
replicaGroupId).get(0));
+ List<String> instances =
instancePartitions.getInstances(segmentPartitionId, replicaGroupId);
+ Preconditions.checkState(instances != null && !instances.isEmpty(),
+ "No instances for partition %s in CONSUMING instance partitions
(table: %s). "
+ + "Check stream partition subset config matches instance
partition selection.",
Review Comment:
The error message could be more actionable by specifying which configuration
properties to check. Consider mentioning 'stream.kafka.partition.ids'
explicitly in the error message.
```suggestion
+ "Check that the stream partition subset configuration
(for example, 'stream.kafka.partition.ids') "
+ "matches the instance partition selection in the table
configuration.",
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java:
##########
@@ -390,24 +404,30 @@ private void
replicaGroupBasedMinimumMovement(Map<Integer, List<InstanceConfig>>
}
if (numPartitions == 1) {
+ int partitionId = (getPartitionIds() != null && getPartitionIds().size()
== 1)
+ ? getPartitionIds().get(0) : 0;
for (int replicaGroupId = 0; replicaGroupId < numReplicaGroups;
replicaGroupId++) {
List<String> instancesInReplicaGroup =
replicaGroupIdToInstancesMap.get(replicaGroupId);
if (replicaGroupId < existingNumReplicaGroups) {
- List<String> existingInstances =
_existingInstancePartitions.getInstances(0, replicaGroupId);
+ List<String> existingInstances =
_existingInstancePartitions.getInstances(partitionId, replicaGroupId);
LinkedHashSet<String> candidateInstances = new
LinkedHashSet<>(instancesInReplicaGroup);
List<String> instances =
- selectInstancesWithMinimumMovement(numInstancesPerReplicaGroup,
candidateInstances, existingInstances);
+ selectInstancesWithMinimumMovement(numInstancesPerReplicaGroup,
candidateInstances,
+ existingInstances != null ? existingInstances : List.of());
Review Comment:
The null check for existingInstances could be simplified by handling it at
the source. Consider checking if _existingInstancePartitions.getInstances()
returns null before calling selectInstancesWithMinimumMovement to avoid the
ternary operator.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]