adixitconfluent commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2043841030
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
return lock;
}
+ // Visible for testing.
+ RemoteFetch remoteFetch() {
+ return remoteFetchOpt.orElse(null);
+ }
+
// Visible for testing.
Meter expiredRequestMeter() {
return expiredRequestMeter;
}
+
+ private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo>
maybePrepareRemoteStorageFetchInfo(
+ LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+ LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse
+ ) {
+ LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo>
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+ replicaManagerReadResponse.forEach((topicIdPartition, logReadResult)
-> {
+ if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+ remoteStorageFetchMetadataMap.put(topicIdPartition,
logReadResult.info().delayedRemoteStorageFetch.get());
+ partitionsAcquired.put(topicIdPartition,
topicPartitionData.get(topicIdPartition));
+ localPartitionsAlreadyFetched.put(topicIdPartition,
logReadResult);
+ }
+ });
+ return remoteStorageFetchMetadataMap;
+ }
+
+ private boolean maybeProcessRemoteFetch(
+ LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+ LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo>
remoteStorageFetchInfoMap,
+ LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse
+ ) throws Exception {
+ // topic partitions for which fetching would be happening from local
log and not remote storage.
+ Set<TopicIdPartition> localFetchTopicPartitions = new
LinkedHashSet<>();
+ topicPartitionData.keySet().forEach(topicIdPartition -> {
+ if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
+ localFetchTopicPartitions.add(topicIdPartition);
+ }
+ });
+ // Release acquisition lock for the topic partitions that were
acquired but were not a part of remote fetch.
+ releasePartitionLocks(localFetchTopicPartitions);
+ Optional<Exception> exceptionOpt =
processRemoteFetchOrException(remoteStorageFetchInfoMap,
replicaManagerReadResponse);
+ if (exceptionOpt.isPresent()) {
+ remoteStorageFetchException = exceptionOpt;
+ throw exceptionOpt.get();
+ }
+ // Check if remote fetch can be completed.
+ return maybeCompletePendingRemoteFetch();
Review Comment:
hmm, `tryComplete()` should return true/false depending on whether we can
move to `forceComplete()` or not. In case we decide not to call
`maybeCompletePendingRemoteFetch()`, we will have to change this line to
`return false`. I don't think that's an ideal behaviour. Your thoughts @junrao ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]