junrao commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2052746487
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -277,22 +316,32 @@ public boolean tryComplete() {
return false;
} catch (Exception e) {
log.error("Error processing delayed share fetch request", e);
- releasePartitionLocks(topicPartitionData.keySet());
- partitionsAcquired.clear();
- partitionsAlreadyFetched.clear();
- return forceComplete();
+ // In case we have a remote fetch exception, we have already
released locks for partitions which have potential
+ // local log read. We do not release locks for partitions which
have a remote storage read because we need to
+ // complete the share fetch request in onComplete and if we
release the locks early here, some other DelayedShareFetch
+ // request might get the locks for those partitions without this
one getting complete.
+ if (remoteStorageFetchException.isEmpty()) {
+ releasePartitionLocks(topicPartitionData.keySet());
+ partitionsAcquired.clear();
+ localPartitionsAlreadyFetched.clear();
+ return forceComplete();
Review Comment:
Should we just call `forceCompleteRequest()` here to for consistency? We
could optimize `releasePartitionLocksAndAddToActionQueue()` so that if doesn't
add to the action queue if the acquired partition set is empty.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]