adixitconfluent commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2043830088
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -152,58 +176,68 @@ public void onExpiration() {
@Override
public void onComplete() {
// We are utilizing lock so that onComplete doesn't do a dirty read
for instance variables -
- // partitionsAcquired and partitionsAlreadyFetched, since these
variables can get updated in a different tryComplete thread.
+ // partitionsAcquired and localPartitionsAlreadyFetched, since these
variables can get updated in a different tryComplete thread.
lock.lock();
log.trace("Completing the delayed share fetch request for group {},
member {}, "
+ "topic partitions {}", shareFetch.groupId(),
shareFetch.memberId(),
partitionsAcquired.keySet());
try {
- LinkedHashMap<TopicIdPartition, Long> topicPartitionData;
- // tryComplete did not invoke forceComplete, so we need to check
if we have any partitions to fetch.
- if (partitionsAcquired.isEmpty()) {
- topicPartitionData = acquirablePartitions();
- // The TopicPartitionsAcquireTimeMs metric signifies the
tension when acquiring the locks
- // for the share partition, hence if no partitions are yet
acquired by tryComplete,
- // we record the metric here. Do not check if the request has
successfully acquired any
- // partitions now or not, as then the upper bound of request
timeout shall be recorded
- // for the metric.
- updateAcquireElapsedTimeMetric();
+ if (remoteStorageFetchException.isPresent()) {
+ completeErroneousRemoteShareFetchRequest();
+ } else if (remoteFetchOpt.isPresent()) {
+ completeRemoteStorageShareFetchRequest();
} else {
- // tryComplete invoked forceComplete, so we can use the data
from tryComplete.
- topicPartitionData = partitionsAcquired;
+ completeLocalLogShareFetchRequest();
}
-
- if (topicPartitionData.isEmpty()) {
- // No locks for share partitions could be acquired, so we
complete the request with an empty response.
-
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0);
- shareFetch.maybeComplete(Map.of());
- return;
- } else {
- // Update metric to record acquired to requested partitions.
- double requestTopicToAcquired = (double)
topicPartitionData.size() / shareFetch.topicIdPartitions().size();
-
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int)
(requestTopicToAcquired * 100));
- }
- log.trace("Fetchable share partitions data: {} with groupId: {}
fetch params: {}",
- topicPartitionData, shareFetch.groupId(),
shareFetch.fetchParams());
-
- completeShareFetchRequest(topicPartitionData);
} finally {
lock.unlock();
}
}
- private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition,
Long> topicPartitionData) {
+ private void completeLocalLogShareFetchRequest() {
+ LinkedHashMap<TopicIdPartition, Long> topicPartitionData;
+ // tryComplete did not invoke forceComplete, so we need to check if we
have any partitions to fetch.
+ if (partitionsAcquired.isEmpty()) {
+ topicPartitionData = acquirablePartitions(sharePartitions);
+ // The TopicPartitionsAcquireTimeMs metric signifies the tension
when acquiring the locks
+ // for the share partition, hence if no partitions are yet
acquired by tryComplete,
+ // we record the metric here. Do not check if the request has
successfully acquired any
+ // partitions now or not, as then the upper bound of request
timeout shall be recorded
+ // for the metric.
+ updateAcquireElapsedTimeMetric();
+ } else {
+ // tryComplete invoked forceComplete, so we can use the data from
tryComplete.
+ topicPartitionData = partitionsAcquired;
+ }
+
+ if (topicPartitionData.isEmpty()) {
+ // No locks for share partitions could be acquired, so we complete
the request with an empty response.
+
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0);
+ shareFetch.maybeComplete(Map.of());
+ return;
+ } else {
+ // Update metric to record acquired to requested partitions.
+ double requestTopicToAcquired = (double) topicPartitionData.size()
/ shareFetch.topicIdPartitions().size();
+
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int)
(requestTopicToAcquired * 100));
+ }
+ log.trace("Fetchable share partitions data: {} with groupId: {} fetch
params: {}",
+ topicPartitionData, shareFetch.groupId(),
shareFetch.fetchParams());
+
+ processAcquiredTopicPartitions(topicPartitionData);
+ }
+
+ private void
processAcquiredTopicPartitions(LinkedHashMap<TopicIdPartition, Long>
topicPartitionData) {
Review Comment:
I have renamed the function to
`processAcquiredTopicPartitionsForLocalLogFetch`
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -242,8 +268,14 @@ private void
completeShareFetchRequest(LinkedHashMap<TopicIdPartition, Long> top
*/
@Override
public boolean tryComplete() {
- LinkedHashMap<TopicIdPartition, Long> topicPartitionData =
acquirablePartitions();
+ // Check to see if the remote fetch is in flight. If there is an in
flight remote fetch we want to resolve it first.
+ // This will help to prevent starving remote storage partitions and
wasting the significant upfront work involved with
+ // kicking off a fetch from remote storage.
Review Comment:
The point I was trying to convey was that remote fetch would take more
resources that a local log fetch. So, if we've identified that a given share
fetch request contains remote storage topic partitions fetch in-flight, we
should prioritize them over local log fetch. I guess the word "starve" makes it
a little unclear. I've changed the comment to `Check to see if the remote fetch
is in flight. If there is an in flight remote fetch we want to resolve it
first.`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]