junrao commented on code in PR #17534:
URL: https://github.com/apache/kafka/pull/17534#discussion_r1815560941
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -611,20 +564,9 @@ void maybeProcessFetchQueue() {
// Add the share fetch to the delayed share fetch purgatory to
process the fetch request.
addDelayedShareFetch(new DelayedShareFetch(shareFetchData,
replicaManager, this),
delayedShareFetchWatchKeys);
-
- // Release the lock so that other threads can process the queue.
- releaseProcessFetchQueueLock();
- // If there are more requests in the queue, then process them.
- if (!fetchQueue.isEmpty())
- maybeProcessFetchQueue();
-
} catch (Exception e) {
// In case exception occurs then release the locks so queue can be
further processed.
log.error("Error processing fetch queue for share partitions", e);
- releaseProcessFetchQueueLock();
Review Comment:
Thanks. Also, `SharePartition.maybeAcquireFetchLock` still has comments on
fetch queue.
```
/**
* Prior to fetching records from the leader, the fetch lock is acquired
to ensure that the same
* share partition does not enter a fetch queue while another one is
being fetched within the queue.
* The fetch lock is released once the records are fetched from the
leader.
*
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]