apoorvmittal10 commented on code in PR #19148:
URL: https://github.com/apache/kafka/pull/19148#discussion_r1986418726
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -258,14 +257,14 @@ public CompletableFuture<Map<TopicIdPartition,
PartitionData>> fetchMessages(
FetchParams fetchParams,
int sessionEpoch,
int batchSize,
- LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes
+ ArrayList<TopicIdPartition> topicPartitions
Review Comment:
```suggestion
List<TopicIdPartition> topicIdPartitions
```
##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -526,7 +518,7 @@ public void testForceCompleteTriggersDelayedActionsQueue() {
TopicIdPartition tp0 = new TopicIdPartition(topicId, new
TopicPartition("foo", 0));
TopicIdPartition tp1 = new TopicIdPartition(topicId, new
TopicPartition("foo", 1));
TopicIdPartition tp2 = new TopicIdPartition(topicId, new
TopicPartition("foo", 2));
- LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes1 =
orderedMap(PARTITION_MAX_BYTES, tp0, tp1);
+ ArrayList<TopicIdPartition> topicIdPartitions1 = arrayList(tp0, tp1);
Review Comment:
```suggestion
List<TopicIdPartition> topicIdPartitions1 = List.of(tp0, tp1);
```
##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java:
##########
@@ -48,7 +47,7 @@ public String toString() {
*
* @return the rotated topicIdPartitions
*/
- LinkedHashMap<TopicIdPartition, Integer>
rotate(LinkedHashMap<TopicIdPartition, Integer> topicIdPartitions,
PartitionRotateMetadata metadata);
+ ArrayList<TopicIdPartition> rotate(ArrayList<TopicIdPartition>
topicIdPartitions, PartitionRotateMetadata metadata);
Review Comment:
```suggestion
List<TopicIdPartition> rotate(List<TopicIdPartition> topicIdPartitions,
PartitionRotateMetadata metadata);
```
##########
clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java:
##########
@@ -151,7 +149,7 @@ public String toString() {
}
private final ShareFetchRequestData data;
- private volatile LinkedHashMap<TopicIdPartition,
ShareFetchRequest.SharePartitionData> shareFetchData = null;
+ private volatile ArrayList<TopicIdPartition> shareFetchData = null;
Review Comment:
```suggestion
private volatile List<TopicIdPartition> shareFetchData = null;
```
##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java:
##########
@@ -64,8 +63,8 @@ static PartitionRotateStrategy type(StrategyType type) {
*
* @return the rotated topicIdPartitions
*/
- static LinkedHashMap<TopicIdPartition, Integer> rotateRoundRobin(
- LinkedHashMap<TopicIdPartition, Integer> topicIdPartitions,
+ static ArrayList<TopicIdPartition> rotateRoundRobin(
Review Comment:
```suggestion
static List<TopicIdPartition> rotateRoundRobin(
```
##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java:
##########
@@ -110,25 +109,20 @@ public synchronized LastUsedKey lastUsedKey() {
return new LastUsedKey(key, lastUsedMs);
}
- // Visible for testing
- public synchronized long creationMs() {
- return creationMs;
- }
-
// Update the cached partition data based on the request.
- public synchronized Map<ModifiedTopicIdPartitionType,
List<TopicIdPartition>> update(Map<TopicIdPartition,
- ShareFetchRequest.SharePartitionData> shareFetchData,
List<TopicIdPartition> toForget) {
+ public synchronized Map<ModifiedTopicIdPartitionType,
List<TopicIdPartition>> update(
+ List<TopicIdPartition> shareFetchData,
+ List<TopicIdPartition> toForget) {
Review Comment:
```suggestion
List<TopicIdPartition> toForget
) {
```
##########
server/src/main/java/org/apache/kafka/server/share/fetch/PartitionRotateStrategy.java:
##########
@@ -80,20 +79,18 @@ static LinkedHashMap<TopicIdPartition, Integer>
rotateRoundRobin(
return topicIdPartitions;
}
- // TODO: Once the partition max bytes is removed then the partition
will be a linked list and rotation
- // will be a simple operation. Else consider using
ImplicitLinkedHashCollection.
- LinkedHashMap<TopicIdPartition, Integer> suffixPartitions = new
LinkedHashMap<>(rotateAt);
- LinkedHashMap<TopicIdPartition, Integer> rotatedPartitions = new
LinkedHashMap<>(topicIdPartitions.size());
+ ArrayList<TopicIdPartition> suffixPartitions = new
ArrayList<>(rotateAt);
+ ArrayList<TopicIdPartition> rotatedPartitions = new
ArrayList<>(topicIdPartitions.size());
int i = 0;
- for (Map.Entry<TopicIdPartition, Integer> entry :
topicIdPartitions.entrySet()) {
+ for (TopicIdPartition topicIdPartition : topicIdPartitions) {
if (i < rotateAt) {
- suffixPartitions.put(entry.getKey(), entry.getValue());
+ suffixPartitions.add(topicIdPartition);
} else {
- rotatedPartitions.put(entry.getKey(), entry.getValue());
+ rotatedPartitions.add(topicIdPartition);
}
i++;
}
- rotatedPartitions.putAll(suffixPartitions);
+ rotatedPartitions.addAll(suffixPartitions);
Review Comment:
Should we use `Collections.rotate` instead?
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -427,29 +426,20 @@ private CompletableFuture<Map<TopicIdPartition,
ShareAcknowledgeResponseData.Par
/**
* The newContext method is used to create a new share fetch context for
every share fetch request.
* @param groupId The group id in the share fetch request.
- * @param shareFetchData The topic-partitions and their corresponding
maxBytes data in the share fetch request.
+ * @param shareFetchData The topic-partitions in the share fetch request.
* @param toForget The topic-partitions to forget present in the share
fetch request.
* @param reqMetadata The metadata in the share fetch request.
* @param isAcknowledgeDataPresent This tells whether the fetch request
received includes piggybacked acknowledgements or not
* @return The new share fetch context object
*/
- public ShareFetchContext newContext(String groupId, Map<TopicIdPartition,
ShareFetchRequest.SharePartitionData> shareFetchData,
+ public ShareFetchContext newContext(String groupId, List<TopicIdPartition>
shareFetchData,
List<TopicIdPartition> toForget,
ShareRequestMetadata reqMetadata, Boolean isAcknowledgeDataPresent) {
ShareFetchContext context;
- // TopicPartition with maxBytes as 0 should not be added in the
cachedPartitions
- Map<TopicIdPartition, ShareFetchRequest.SharePartitionData>
shareFetchDataWithMaxBytes = new HashMap<>();
- shareFetchData.forEach((tp, sharePartitionData) -> {
- if (sharePartitionData.maxBytes > 0)
shareFetchDataWithMaxBytes.put(tp, sharePartitionData);
- });
// If the request's epoch is FINAL_EPOCH or INITIAL_EPOCH, we should
remove the existing sessions. Also, start a
// new session in case it is INITIAL_EPOCH. Hence, we need to treat
them as special cases.
if (reqMetadata.isFull()) {
ShareSessionKey key = shareSessionKey(groupId,
reqMetadata.memberId());
if (reqMetadata.epoch() == ShareRequestMetadata.FINAL_EPOCH) {
- // If the epoch is FINAL_EPOCH, don't try to create a new
session.
- if (!shareFetchDataWithMaxBytes.isEmpty()) {
- throw Errors.INVALID_REQUEST.exception();
- }
Review Comment:
Shouldn't we have a similar check i.e. if topic partitions are included in
fetch request with final epoch?
##########
server/src/main/java/org/apache/kafka/server/share/session/ShareSession.java:
##########
@@ -110,25 +109,20 @@ public synchronized LastUsedKey lastUsedKey() {
return new LastUsedKey(key, lastUsedMs);
}
- // Visible for testing
- public synchronized long creationMs() {
- return creationMs;
- }
Review Comment:
Why we have removed the creationMs, is it not being used?
##########
server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java:
##########
@@ -30,34 +30,30 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.LinkedHashMap;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import static org.apache.kafka.test.TestUtils.tempFile;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Helper functions for writing share fetch unit tests.
*/
public class ShareFetchTestUtils {
/**
- * Create an ordered map of TopicIdPartition to partition max bytes.
+ * Create an ArrayList of topic partitions.
*
- * @param partitionMaxBytes The maximum number of bytes that can be
fetched for each partition.
- * @param topicIdPartitions The topic partitions to create the map for.
- * @return The ordered map of TopicIdPartition to partition max bytes.
+ * @param topicIdPartitions The topic partitions to create the list for.
+ * @return The list of topic partitions.
*/
- public static LinkedHashMap<TopicIdPartition, Integer> orderedMap(int
partitionMaxBytes, TopicIdPartition... topicIdPartitions) {
- LinkedHashMap<TopicIdPartition, Integer> map = new LinkedHashMap<>();
- for (TopicIdPartition tp : topicIdPartitions) {
- map.put(tp, partitionMaxBytes);
- }
- return map;
+ public static ArrayList<TopicIdPartition> arrayList(TopicIdPartition...
topicIdPartitions) {
+ ArrayList<TopicIdPartition> list = new ArrayList<>();
+ Collections.addAll(list, topicIdPartitions);
+ return list;
}
Review Comment:
Why not to use List.of()? Do we need mutable list?
##########
clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java:
##########
@@ -226,23 +189,18 @@ public int maxWait() {
return data.maxWaitMs();
}
- public Map<TopicIdPartition, ShareFetchRequest.SharePartitionData>
shareFetchData(Map<Uuid, String> topicNames) {
+ public List<TopicIdPartition> shareFetchData(Map<Uuid, String> topicNames)
{
if (shareFetchData == null) {
synchronized (this) {
if (shareFetchData == null) {
// Assigning the lazy-initialized `shareFetchData` in the
last step
// to avoid other threads accessing a half-initialized
object.
- final LinkedHashMap<TopicIdPartition,
ShareFetchRequest.SharePartitionData> shareFetchDataTmp = new LinkedHashMap<>();
+ final ArrayList<TopicIdPartition> shareFetchDataTmp = new
ArrayList<>();
Review Comment:
```suggestion
final List<TopicIdPartition> shareFetchDataTmp = new
ArrayList<>();
```
##########
core/src/test/java/kafka/server/share/DelayedShareFetchTest.java:
##########
@@ -134,7 +132,7 @@ public void
testDelayedShareFetchTryCompleteReturnsFalseDueToNonAcquirablePartit
sharePartitions.put(tp1, sp1);
ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId,
Uuid.randomUuid().toString(),
- new CompletableFuture<>(), partitionMaxBytes, BATCH_SIZE,
MAX_FETCH_RECORDS,
+ new CompletableFuture<>(), arrayList(tp0, tp1), BATCH_SIZE,
MAX_FETCH_RECORDS,
Review Comment:
```suggestion
new CompletableFuture<>(), List.of(tp0, tp1), BATCH_SIZE,
MAX_FETCH_RECORDS,
```
##########
clients/src/main/java/org/apache/kafka/common/requests/ShareFetchRequest.java:
##########
@@ -151,7 +149,7 @@ public String toString() {
}
private final ShareFetchRequestData data;
- private volatile LinkedHashMap<TopicIdPartition,
ShareFetchRequest.SharePartitionData> shareFetchData = null;
+ private volatile ArrayList<TopicIdPartition> shareFetchData = null;
Review Comment:
Is there a reason to use ArrayList specifically and not declaring the `List`
interface.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]