jeffkbkim commented on code in PR #17700:
URL: https://github.com/apache/kafka/pull/17700#discussion_r1917124898
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -456,6 +457,36 @@ protected Map<Node, FetchSessionHandler.FetchRequestData>
prepareFetchRequests()
return
fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().build()));
}
+ private Set<Integer> nodesWithBufferedPartitions(Set<TopicPartition>
buffered, long currentTimeMs) {
+ Set<Integer> nodesWithBufferedPartitions = new HashSet<>();
+
+ for (TopicPartition partition : buffered) {
+ if (!subscriptions.isAssigned(partition)) {
+ // It's possible that a partition with buffered data from a
previous request is now no longer
+ // assigned to the consumer, in which case just skip this
partition.
+ continue;
+ }
+
+ SubscriptionState.FetchPosition position =
subscriptions.position(partition);
Review Comment:
In nodesWithBufferedPartitions, we skip if position is null or if the leader
is empty.
Previously, we would throw illegal state exception (L409) or log and request
a metadata update for buffered partitions.
Is there a reason for this divergence of this, or should we merge the two
logic together?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -456,6 +457,36 @@ protected Map<Node, FetchSessionHandler.FetchRequestData>
prepareFetchRequests()
return
fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().build()));
}
+ private Set<Integer> nodesWithBufferedPartitions(Set<TopicPartition>
buffered, long currentTimeMs) {
+ Set<Integer> nodesWithBufferedPartitions = new HashSet<>();
+
+ for (TopicPartition partition : buffered) {
+ if (!subscriptions.isAssigned(partition)) {
Review Comment:
Can we add test cases for all of the partitions we skip?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -408,7 +388,21 @@ protected Map<Node, FetchSessionHandler.FetchRequestData>
prepareFetchRequests()
long currentTimeMs = time.milliseconds();
Map<String, Uuid> topicIds = metadata.topicIds();
- for (TopicPartition partition : fetchablePartitions()) {
+ // This is the set of partitions that have buffered data
+ Set<TopicPartition> buffered =
Collections.unmodifiableSet(fetchBuffer.bufferedPartitions());
+
+ // This is the set of partitions that do not have buffered data
+ Set<TopicPartition> unbuffered =
Set.copyOf(subscriptions.fetchablePartitions(tp -> !buffered.contains(tp)));
+
+ if (unbuffered.isEmpty()) {
+ // If there are no partitions that don't already have data locally
buffered, there's no need to issue
+ // any fetch requests at the present time.
+ return Collections.emptyMap();
Review Comment:
to confirm, after we return from this prepareFetchRequests would be invoked
in the next poll()?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -3757,28 +3764,6 @@ private FetchResponse fetchResponse(TopicIdPartition tp,
MemoryRecords records,
return FetchResponse.of(Errors.NONE, throttleTime, INVALID_SESSION_ID,
new LinkedHashMap<>(partitions));
}
- private FetchResponse fetchResponse2(TopicIdPartition tp1, MemoryRecords
records1, long hw1,
- TopicIdPartition tp2, MemoryRecords
records2, long hw2) {
- Map<TopicIdPartition, FetchResponseData.PartitionData> partitions =
new HashMap<>();
- partitions.put(tp1,
- new FetchResponseData.PartitionData()
- .setPartitionIndex(tp1.topicPartition().partition())
- .setErrorCode(Errors.NONE.code())
- .setHighWatermark(hw1)
-
.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
- .setLogStartOffset(0)
- .setRecords(records1));
- partitions.put(tp2,
- new FetchResponseData.PartitionData()
- .setPartitionIndex(tp2.topicPartition().partition())
- .setErrorCode(Errors.NONE.code())
- .setHighWatermark(hw2)
-
.setLastStableOffset(FetchResponse.INVALID_LAST_STABLE_OFFSET)
- .setLogStartOffset(0)
- .setRecords(records2));
- return FetchResponse.of(Errors.NONE, 0, INVALID_SESSION_ID, new
LinkedHashMap<>(partitions));
- }
-
/**
* Assert that the {@link Fetcher#collectFetch() latest fetch} does not
contain any
* {@link Fetch#records() user-visible records}, did not
Review Comment:
nit: prepareOffsetsForLeaderEpochResponse is also not used L3623. can we
remove it as well?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -408,7 +388,21 @@ protected Map<Node, FetchSessionHandler.FetchRequestData>
prepareFetchRequests()
long currentTimeMs = time.milliseconds();
Map<String, Uuid> topicIds = metadata.topicIds();
- for (TopicPartition partition : fetchablePartitions()) {
+ // This is the set of partitions that have buffered data
+ Set<TopicPartition> buffered =
Collections.unmodifiableSet(fetchBuffer.bufferedPartitions());
+
+ // This is the set of partitions that do not have buffered data
+ Set<TopicPartition> unbuffered =
Set.copyOf(subscriptions.fetchablePartitions(tp -> !buffered.contains(tp)));
+
+ if (unbuffered.isEmpty()) {
+ // If there are no partitions that don't already have data locally
buffered, there's no need to issue
+ // any fetch requests at the present time.
+ return Collections.emptyMap();
+ }
+
+ Set<Integer> nodesWithBufferedPartitions =
nodesWithBufferedPartitions(buffered, currentTimeMs);
Review Comment:
Makes sense. can you create one?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]