adixitconfluent commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2043830522
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
return lock;
}
+ // Visible for testing.
+ RemoteFetch remoteFetch() {
+ return remoteFetchOpt.orElse(null);
+ }
+
// Visible for testing.
Meter expiredRequestMeter() {
return expiredRequestMeter;
}
+
+ private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo>
maybePrepareRemoteStorageFetchInfo(
+ LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+ LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse
+ ) {
+ LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo>
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+ replicaManagerReadResponse.forEach((topicIdPartition, logReadResult)
-> {
+ if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+ remoteStorageFetchMetadataMap.put(topicIdPartition,
logReadResult.info().delayedRemoteStorageFetch.get());
+ partitionsAcquired.put(topicIdPartition,
topicPartitionData.get(topicIdPartition));
+ localPartitionsAlreadyFetched.put(topicIdPartition,
logReadResult);
+ }
+ });
+ return remoteStorageFetchMetadataMap;
+ }
+
+ private boolean maybeProcessRemoteFetch(
+ LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+ LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo>
remoteStorageFetchInfoMap,
+ LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse
+ ) throws Exception {
+ // topic partitions for which fetching would be happening from local
log and not remote storage.
+ Set<TopicIdPartition> localFetchTopicPartitions = new
LinkedHashSet<>();
+ topicPartitionData.keySet().forEach(topicIdPartition -> {
+ if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
+ localFetchTopicPartitions.add(topicIdPartition);
+ }
+ });
+ // Release acquisition lock for the topic partitions that were
acquired but were not a part of remote fetch.
+ releasePartitionLocks(localFetchTopicPartitions);
+ Optional<Exception> exceptionOpt =
processRemoteFetchOrException(remoteStorageFetchInfoMap,
replicaManagerReadResponse);
+ if (exceptionOpt.isPresent()) {
+ remoteStorageFetchException = exceptionOpt;
+ throw exceptionOpt.get();
+ }
+ // Check if remote fetch can be completed.
+ return maybeCompletePendingRemoteFetch();
+ }
+
+ /**
+ * Returns an option containing an exception if a task for
RemoteStorageFetchInfo could not be scheduled successfully else returns empty
optional.
+ * @param remoteStorageFetchInfoMap - The topic partition to remote
storage fetch info map
+ */
+ private Optional<Exception> processRemoteFetchOrException(
+ LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo>
remoteStorageFetchInfoMap,
+ LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse
Review Comment:
sorry, I missed the parameter `replicaManagerReadResponse`. I've added it
now. Thanks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]