junrao commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2052697705
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -252,13 +291,19 @@ public boolean tryComplete() {
// replicaManager.readFromLog to populate the offset metadata
and update the fetch offset metadata for
// those topic partitions.
LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse = maybeReadFromLog(topicPartitionData);
+ // Store the remote fetch info and the topic partition for
which we need to perform remote fetch.
+ Optional<TopicPartitionRemoteFetchInfo>
topicPartitionRemoteFetchInfoOpt =
maybePrepareRemoteStorageFetchInfo(topicPartitionData,
replicaManagerReadResponse);
+
+ if (topicPartitionRemoteFetchInfoOpt.isPresent()) {
+ return maybeProcessRemoteFetch(topicPartitionData,
topicPartitionRemoteFetchInfoOpt.get());
+ }
maybeUpdateFetchOffsetMetadata(topicPartitionData,
replicaManagerReadResponse);
if (anyPartitionHasLogReadError(replicaManagerReadResponse) ||
isMinBytesSatisfied(topicPartitionData,
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes,
topicPartitionData.keySet(), topicPartitionData.size()))) {
partitionsAcquired = topicPartitionData;
- partitionsAlreadyFetched = replicaManagerReadResponse;
+ localPartitionsAlreadyFetched = replicaManagerReadResponse;
boolean completedByMe = forceComplete();
// If invocation of forceComplete is not successful, then
that means the request is already completed
- // hence release the acquired locks.
+ // hence the acquired locks are already released.
Review Comment:
This comment is still not quite accurate. If `forceComplete()` returns
false, it actually means that the locks haven't been released yet. How about
the following?
`If the delayed operation is completed by me, the acquired locks are already
released in onComplete(). Otherwise, need to release the acquired locks. `
Also, we call `forceComplete()` in 4 different places and there are quite a
bit of duplicated code/comment. Could we introduce a private method and reuse
the code in all those places?
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -277,22 +322,39 @@ public boolean tryComplete() {
return false;
} catch (Exception e) {
log.error("Error processing delayed share fetch request", e);
- releasePartitionLocks(topicPartitionData.keySet());
- partitionsAcquired.clear();
- partitionsAlreadyFetched.clear();
- return forceComplete();
+ // In case we have a remote fetch exception, we have already
released locks for partitions which have potential
+ // local log read. We do not release locks for partitions which
have a remote storage read because we need to
+ // complete the share fetch request in onComplete and if we
release the locks early here, some other DelayedShareFetch
+ // request might get the locks for those partitions without this
one getting complete.
+ if (remoteStorageFetchException.isEmpty()) {
+ releasePartitionLocks(topicPartitionData.keySet());
+ partitionsAcquired.clear();
+ localPartitionsAlreadyFetched.clear();
+ return forceComplete();
+ } else {
+ boolean completedByMe = forceComplete();
+ // If invocation of forceComplete is not successful, then that
means the request is already completed
+ // hence the acquired locks are already released. This can
occur in case of remote storage fetch if there is a thread that
+ // completes the operation (due to expiration) right before a
different thread is about to enter tryComplete.
+ if (!completedByMe) {
+ releasePartitionLocks(partitionsAcquired.keySet());
Review Comment:
When we release the locks here, should we trigger a purgatory check since
some other pending delayed operations may be waiting on the locks?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]