apoorvmittal10 commented on code in PR #19148:
URL: https://github.com/apache/kafka/pull/19148#discussion_r1991048323
##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java:
##########
@@ -80,20 +81,11 @@ static LinkedHashMap<TopicIdPartition, Integer>
rotateRoundRobin(
return topicIdPartitions;
}
- // TODO: Once the partition max bytes is removed then the partition
will be a linked list and rotation
- // will be a simple operation. Else consider using
ImplicitLinkedHashCollection.
- LinkedHashMap<TopicIdPartition, Integer> suffixPartitions = new
LinkedHashMap<>(rotateAt);
- LinkedHashMap<TopicIdPartition, Integer> rotatedPartitions = new
LinkedHashMap<>(topicIdPartitions.size());
- int i = 0;
- for (Map.Entry<TopicIdPartition, Integer> entry :
topicIdPartitions.entrySet()) {
- if (i < rotateAt) {
- suffixPartitions.put(entry.getKey(), entry.getValue());
- } else {
- rotatedPartitions.put(entry.getKey(), entry.getValue());
- }
- i++;
- }
- rotatedPartitions.putAll(suffixPartitions);
+ // We don't want to modify the original list, hence created a copy.
+ List<TopicIdPartition> rotatedPartitions = new
ArrayList<>(topicIdPartitions);
+ // We want the elements from the end of the list to move left by the
distance provided i.e. if the original list is [1,2,3],
+ // and we want to rotate it by 1, we want the output as [2,3,1] and
not [3,1,2]. Hence, we need negation of distance here.
Review Comment:
```suggestion
// Elements from the list should move left by the distance provided
i.e. if the original list is [1,2,3],
// and rotation is by 1, then output should be [2,3,1] and not
[3,1,2]. Hence, negate the distance here.
```
##########
server/src/test/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategyTest.java:
##########
@@ -35,64 +36,64 @@ public class PartitionRotateStrategyTest {
@Test
public void testRoundRobinStrategy() {
PartitionRotateStrategy strategy =
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
- LinkedHashMap<TopicIdPartition, Integer> partitions =
createPartitions(3);
+ List<TopicIdPartition> partitions = createPartitions(3);
- LinkedHashMap<TopicIdPartition, Integer> result =
strategy.rotate(partitions, new PartitionRotateMetadata(1));
+ List<TopicIdPartition> result = strategy.rotate(partitions, new
PartitionRotateMetadata(1));
assertEquals(3, result.size());
- validateRotatedMapEquals(partitions, result, 1);
+ validateRotatedListEquals(partitions, result, 1);
// Session epoch is greater than the number of partitions.
result = strategy.rotate(partitions, new PartitionRotateMetadata(5));
assertEquals(3, result.size());
- validateRotatedMapEquals(partitions, result, 2);
+ validateRotatedListEquals(partitions, result, 2);
// Session epoch is at Integer.MAX_VALUE.
result = strategy.rotate(partitions, new
PartitionRotateMetadata(Integer.MAX_VALUE));
assertEquals(3, result.size());
- validateRotatedMapEquals(partitions, result, 1);
+ validateRotatedListEquals(partitions, result, 1);
// No rotation at same size as epoch.
result = strategy.rotate(partitions, new PartitionRotateMetadata(3));
assertEquals(3, result.size());
- validateRotatedMapEquals(partitions, result, 0);
+ validateRotatedListEquals(partitions, result, 0);
}
@Test
public void testRoundRobinStrategyWithSpecialSessionEpochs() {
PartitionRotateStrategy strategy =
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
- LinkedHashMap<TopicIdPartition, Integer> partitions =
createPartitions(3);
- LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(
+ List<TopicIdPartition> partitions = createPartitions(3);
+ List<TopicIdPartition> result = strategy.rotate(
partitions,
new PartitionRotateMetadata(ShareRequestMetadata.INITIAL_EPOCH));
assertEquals(3, result.size());
- validateRotatedMapEquals(partitions, result, 0);
+ validateRotatedListEquals(partitions, result, 0);
result = strategy.rotate(
partitions,
new PartitionRotateMetadata(ShareRequestMetadata.FINAL_EPOCH));
assertEquals(3, result.size());
- validateRotatedMapEquals(partitions, result, 0);
+ validateRotatedListEquals(partitions, result, 0);
}
@Test
public void testRoundRobinStrategyWithEmptyPartitions() {
PartitionRotateStrategy strategy =
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
// Empty partitions.
- LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(new
LinkedHashMap<>(), new PartitionRotateMetadata(5));
+ List<TopicIdPartition> result = strategy.rotate(new ArrayList<>(), new
PartitionRotateMetadata(5));
// The result should be empty.
assertTrue(result.isEmpty());
}
/**
- * Create an ordered map of TopicIdPartition to partition max bytes.
+ * Create an ordered set of topic partitions.
Review Comment:
```suggestion
* Create a list of topic partitions.
```
##########
server/src/test/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategyTest.java:
##########
@@ -35,64 +36,64 @@ public class PartitionRotateStrategyTest {
@Test
public void testRoundRobinStrategy() {
PartitionRotateStrategy strategy =
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
- LinkedHashMap<TopicIdPartition, Integer> partitions =
createPartitions(3);
+ List<TopicIdPartition> partitions = createPartitions(3);
- LinkedHashMap<TopicIdPartition, Integer> result =
strategy.rotate(partitions, new PartitionRotateMetadata(1));
+ List<TopicIdPartition> result = strategy.rotate(partitions, new
PartitionRotateMetadata(1));
assertEquals(3, result.size());
- validateRotatedMapEquals(partitions, result, 1);
+ validateRotatedListEquals(partitions, result, 1);
// Session epoch is greater than the number of partitions.
result = strategy.rotate(partitions, new PartitionRotateMetadata(5));
assertEquals(3, result.size());
- validateRotatedMapEquals(partitions, result, 2);
+ validateRotatedListEquals(partitions, result, 2);
// Session epoch is at Integer.MAX_VALUE.
result = strategy.rotate(partitions, new
PartitionRotateMetadata(Integer.MAX_VALUE));
assertEquals(3, result.size());
- validateRotatedMapEquals(partitions, result, 1);
+ validateRotatedListEquals(partitions, result, 1);
// No rotation at same size as epoch.
result = strategy.rotate(partitions, new PartitionRotateMetadata(3));
assertEquals(3, result.size());
- validateRotatedMapEquals(partitions, result, 0);
+ validateRotatedListEquals(partitions, result, 0);
}
@Test
public void testRoundRobinStrategyWithSpecialSessionEpochs() {
PartitionRotateStrategy strategy =
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
- LinkedHashMap<TopicIdPartition, Integer> partitions =
createPartitions(3);
- LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(
+ List<TopicIdPartition> partitions = createPartitions(3);
+ List<TopicIdPartition> result = strategy.rotate(
partitions,
new PartitionRotateMetadata(ShareRequestMetadata.INITIAL_EPOCH));
assertEquals(3, result.size());
- validateRotatedMapEquals(partitions, result, 0);
+ validateRotatedListEquals(partitions, result, 0);
result = strategy.rotate(
partitions,
new PartitionRotateMetadata(ShareRequestMetadata.FINAL_EPOCH));
assertEquals(3, result.size());
- validateRotatedMapEquals(partitions, result, 0);
+ validateRotatedListEquals(partitions, result, 0);
}
@Test
public void testRoundRobinStrategyWithEmptyPartitions() {
PartitionRotateStrategy strategy =
PartitionRotateStrategy.type(StrategyType.ROUND_ROBIN);
// Empty partitions.
- LinkedHashMap<TopicIdPartition, Integer> result = strategy.rotate(new
LinkedHashMap<>(), new PartitionRotateMetadata(5));
+ List<TopicIdPartition> result = strategy.rotate(new ArrayList<>(), new
PartitionRotateMetadata(5));
// The result should be empty.
assertTrue(result.isEmpty());
}
/**
- * Create an ordered map of TopicIdPartition to partition max bytes.
+ * Create an ordered set of topic partitions.
* @param size The number of topic-partitions to create.
- * @return The ordered map of TopicIdPartition to partition max bytes.
+ * @return The ordered set of topic partitions.
Review Comment:
```suggestion
* @return The list of topic partitions.
```
##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java:
##########
@@ -80,20 +81,11 @@ static LinkedHashMap<TopicIdPartition, Integer>
rotateRoundRobin(
return topicIdPartitions;
}
- // TODO: Once the partition max bytes is removed then the partition
will be a linked list and rotation
- // will be a simple operation. Else consider using
ImplicitLinkedHashCollection.
- LinkedHashMap<TopicIdPartition, Integer> suffixPartitions = new
LinkedHashMap<>(rotateAt);
- LinkedHashMap<TopicIdPartition, Integer> rotatedPartitions = new
LinkedHashMap<>(topicIdPartitions.size());
- int i = 0;
- for (Map.Entry<TopicIdPartition, Integer> entry :
topicIdPartitions.entrySet()) {
- if (i < rotateAt) {
- suffixPartitions.put(entry.getKey(), entry.getValue());
- } else {
- rotatedPartitions.put(entry.getKey(), entry.getValue());
- }
- i++;
- }
- rotatedPartitions.putAll(suffixPartitions);
+ // We don't want to modify the original list, hence created a copy.
Review Comment:
```suggestion
// Avoid modifying the original list, create copy.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]