adixitconfluent commented on code in PR #19437:
URL: https://github.com/apache/kafka/pull/19437#discussion_r2044029412
##########
core/src/main/java/kafka/server/share/DelayedShareFetch.java:
##########
@@ -529,8 +575,315 @@ Lock lock() {
return lock;
}
+ // Visible for testing.
+ RemoteFetch remoteFetch() {
+ return remoteFetchOpt.orElse(null);
+ }
+
// Visible for testing.
Meter expiredRequestMeter() {
return expiredRequestMeter;
}
+
+ private LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo>
maybePrepareRemoteStorageFetchInfo(
+ LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+ LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse
+ ) {
+ LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo>
remoteStorageFetchMetadataMap = new LinkedHashMap<>();
+ replicaManagerReadResponse.forEach((topicIdPartition, logReadResult)
-> {
+ if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) {
+ remoteStorageFetchMetadataMap.put(topicIdPartition,
logReadResult.info().delayedRemoteStorageFetch.get());
+ partitionsAcquired.put(topicIdPartition,
topicPartitionData.get(topicIdPartition));
+ localPartitionsAlreadyFetched.put(topicIdPartition,
logReadResult);
+ }
+ });
+ return remoteStorageFetchMetadataMap;
+ }
+
+ private boolean maybeProcessRemoteFetch(
+ LinkedHashMap<TopicIdPartition, Long> topicPartitionData,
+ LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo>
remoteStorageFetchInfoMap,
+ LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse
+ ) throws Exception {
+ // topic partitions for which fetching would be happening from local
log and not remote storage.
+ Set<TopicIdPartition> localFetchTopicPartitions = new
LinkedHashSet<>();
+ topicPartitionData.keySet().forEach(topicIdPartition -> {
+ if (!remoteStorageFetchInfoMap.containsKey(topicIdPartition)) {
+ localFetchTopicPartitions.add(topicIdPartition);
+ }
+ });
+ // Release acquisition lock for the topic partitions that were
acquired but were not a part of remote fetch.
+ releasePartitionLocks(localFetchTopicPartitions);
+ Optional<Exception> exceptionOpt =
processRemoteFetchOrException(remoteStorageFetchInfoMap,
replicaManagerReadResponse);
+ if (exceptionOpt.isPresent()) {
+ remoteStorageFetchException = exceptionOpt;
+ throw exceptionOpt.get();
+ }
+ // Check if remote fetch can be completed.
+ return maybeCompletePendingRemoteFetch();
+ }
+
+ /**
+ * Returns an option containing an exception if a task for
RemoteStorageFetchInfo could not be scheduled successfully else returns empty
optional.
+ * @param remoteStorageFetchInfoMap - The topic partition to remote
storage fetch info map
+ */
+ private Optional<Exception> processRemoteFetchOrException(
+ LinkedHashMap<TopicIdPartition, RemoteStorageFetchInfo>
remoteStorageFetchInfoMap,
+ LinkedHashMap<TopicIdPartition, LogReadResult>
replicaManagerReadResponse
+ ) {
+ // TODO: There is a limitation in remote storage fetch for consumer
groups that we can only perform remote fetch for
+ // a single topic partition in a fetch request. Since, the logic of
fetch is largely based on how consumer groups work,
+ // we are following the same logic. However, this problem should be
addressed as part of KAFKA-19133 which should help us perform
+ // fetch for multiple remote fetch topic partition in a single share
fetch request
+ TopicIdPartition remoteFetchTopicIdPartition =
getRemoteFetchTopicIdPartition(remoteStorageFetchInfoMap);
+ RemoteStorageFetchInfo remoteStorageFetchInfo =
remoteStorageFetchInfoMap.get(remoteFetchTopicIdPartition);
+
+ LinkedHashMap<TopicIdPartition, LogOffsetMetadata>
fetchOffsetMetadataMap = new LinkedHashMap<>();
+ remoteStorageFetchInfoMap.forEach((topicIdPartition, logReadResult) ->
fetchOffsetMetadataMap.put(
+ topicIdPartition,
+
replicaManagerReadResponse.get(topicIdPartition).info().fetchOffsetMetadata
+ ));
+
+ Future<Void> remoteFetchTask;
+ CompletableFuture<RemoteLogReadResult> remoteFetchResult = new
CompletableFuture<>();
+ try {
+ remoteFetchTask =
replicaManager.remoteLogManager().get().asyncRead(
+ remoteStorageFetchInfo,
+ result -> {
+ remoteFetchResult.complete(result);
+ replicaManager.completeDelayedShareFetchRequest(new
DelayedShareFetchGroupKey(shareFetch.groupId(),
remoteFetchTopicIdPartition.topicId(),
remoteFetchTopicIdPartition.partition()));
+ }
+ );
+ } catch (RejectedExecutionException e) {
+ // Return the error if any in scheduling the remote fetch task.
+ log.warn("Unable to fetch data from remote storage", e);
+ return Optional.of(e);
+ } catch (Exception e) {
+ return Optional.of(e);
+ }
+ remoteFetchOpt = Optional.of(new
RemoteFetch(remoteFetchTopicIdPartition, remoteFetchTask, remoteFetchResult,
remoteStorageFetchInfo, fetchOffsetMetadataMap));
+ return Optional.empty();
+ }
+
+ /**
+ * This function returns the first topic partition for which we need to
perform remote storage fetch. We remove all the
+ * other partitions that can have a remote storage fetch for further
processing and release the fetch locks for them.
+ * @param remoteStorageFetchInfoMap map containing topic partition to
remote storage fetch information.
+ * @return the first topic partition for which we need to perform remote
storage fetch
+ */
+ private TopicIdPartition
getRemoteFetchTopicIdPartition(LinkedHashMap<TopicIdPartition,
RemoteStorageFetchInfo> remoteStorageFetchInfoMap) {
+ Map.Entry<TopicIdPartition, RemoteStorageFetchInfo>
firstRemoteStorageFetchInfo =
remoteStorageFetchInfoMap.entrySet().iterator().next();
+ TopicIdPartition remoteFetchTopicIdPartition =
firstRemoteStorageFetchInfo.getKey();
+ remoteStorageFetchInfoMap.keySet().forEach(topicIdPartition -> {
+ if (!topicIdPartition.equals(remoteFetchTopicIdPartition)) {
+ partitionsAcquired.remove(topicIdPartition);
+ releasePartitionLocks(Set.of(topicIdPartition));
+ }
+ });
+ return remoteFetchTopicIdPartition;
+ }
+
+ /**
+ * This function checks if the remote fetch can be completed or not. It
should always be called once you confirm remoteFetchOpt.isPresent().
+ * The operation can be completed if:
+ * Case a: The partition is in an offline log directory on this broker
+ * Case b: This broker does not know the partition it tries to fetch
+ * Case c: This broker is no longer the leader of the partition it tries
to fetch
+ * Case d: The remote storage read request completed (succeeded or failed)
+ * @return boolean representing whether the remote fetch is completed or
not.
+ */
+ private boolean maybeCompletePendingRemoteFetch() {
+ boolean canComplete = false;
+
+ for (Map.Entry<TopicIdPartition, LogOffsetMetadata> entry :
remoteFetchOpt.get().fetchOffsetMetadataMap().entrySet()) {
+ TopicIdPartition topicIdPartition = entry.getKey();
+ LogOffsetMetadata fetchOffsetMetadata = entry.getValue();
+ try {
+ if (fetchOffsetMetadata !=
LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
+
replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
+ }
+ } catch (KafkaStorageException e) { // Case a
+ log.debug("TopicPartition {} is in an offline log directory,
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
+ canComplete = true;
+ } catch (UnknownTopicOrPartitionException e) { // Case b
+ log.debug("Broker no longer knows of topicPartition {},
satisfy {} immediately", topicIdPartition, shareFetch.fetchParams());
+ canComplete = true;
+ } catch (NotLeaderOrFollowerException e) { // Case c
+ log.debug("Broker is no longer the leader or follower of
topicPartition {}, satisfy {} immediately", topicIdPartition,
shareFetch.fetchParams());
+ canComplete = true;
+ }
+ if (canComplete)
+ break;
+ }
+
+ if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone())
{ // Case d
+ boolean completedByMe = forceComplete();
+ // If invocation of forceComplete is not successful, then that
means the request is already completed
+ // hence release the acquired locks.
+ if (!completedByMe) {
+ releasePartitionLocks(partitionsAcquired.keySet());
+ }
+ return completedByMe;
+ } else
+ return false;
+ }
+
+ /**
+ * This function completes a share fetch request for which we have
identified erroneous remote storage fetch in tryComplete()
+ * It should only be called when we know that there is remote fetch
in-flight/completed.
+ */
+ private void completeErroneousRemoteShareFetchRequest() {
+ try {
+ handleFetchException(shareFetch, partitionsAcquired.keySet(),
remoteStorageFetchException.get());
+ } finally {
+
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet());
+ }
+
+ }
+
+ private void
releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition>
topicIdPartitions) {
+ // Releasing the lock to move ahead with the next request in queue.
+ releasePartitionLocks(topicIdPartitions);
+ // If we have a fetch request completed for a topic-partition, we
release the locks for that partition,
+ // then we should check if there is a pending share fetch request for
the topic-partition and complete it.
+ // We add the action to delayed actions queue to avoid an infinite
call stack, which could happen if
+ // we directly call delayedShareFetchPurgatory.checkAndComplete
+ replicaManager.addToActionQueue(() ->
topicIdPartitions.forEach(topicIdPartition ->
+ replicaManager.completeDelayedShareFetchRequest(
+ new DelayedShareFetchGroupKey(shareFetch.groupId(),
topicIdPartition.topicId(), topicIdPartition.partition()))));
+ }
+
+ /**
+ * This function completes a share fetch request for which we have
identified remoteFetch during tryComplete()
+ * Note - This function should only be called when we know that there is
remote fetch in-flight/completed.
+ */
+ private void completeRemoteStorageShareFetchRequest() {
+ LinkedHashMap<TopicIdPartition, Long> nonRemoteFetchTopicPartitionData
= new LinkedHashMap<>();
+ try {
+ List<ShareFetchPartitionData> shareFetchPartitionData = new
ArrayList<>();
+ int readableBytes = 0;
+ if (remoteFetchOpt.get().remoteFetchResult().isDone()) {
+ RemoteFetch remoteFetch = remoteFetchOpt.get();
+ if (remoteFetch.remoteFetchResult().get().error.isPresent()) {
+ Throwable error =
remoteFetch.remoteFetchResult().get().error.get();
+ // If there is any error for the remote fetch topic
partition, we populate the error accordingly.
+ shareFetchPartitionData.add(
+ new ShareFetchPartitionData(
+ remoteFetch.topicIdPartition(),
+
partitionsAcquired.get(remoteFetch.topicIdPartition()),
+
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
+ )
+ );
+ } else {
+ FetchDataInfo info =
remoteFetch.remoteFetchResult().get().fetchDataInfo.get();
+ TopicIdPartition topicIdPartition =
remoteFetch.topicIdPartition();
+ LogReadResult logReadResult =
localPartitionsAlreadyFetched.get(topicIdPartition);
+ shareFetchPartitionData.add(
+ new ShareFetchPartitionData(
+ topicIdPartition,
+
partitionsAcquired.get(remoteFetch.topicIdPartition()),
+ new FetchPartitionData(
+ logReadResult.error(),
+ logReadResult.highWatermark(),
+ logReadResult.leaderLogStartOffset(),
+ info.records,
+ Optional.empty(),
+ logReadResult.lastStableOffset().isDefined() ?
OptionalLong.of((Long) logReadResult.lastStableOffset().get()) :
OptionalLong.empty(),
+ info.abortedTransactions,
+
logReadResult.preferredReadReplica().isDefined() ? OptionalInt.of((Integer)
logReadResult.preferredReadReplica().get()) : OptionalInt.empty(),
+ false
+ )
+ )
+ );
+ readableBytes += info.records.sizeInBytes();
+ }
+ } else {
+ cancelRemoteFetchTask();
+ }
+
+ // If remote fetch bytes < shareFetch.fetchParams().maxBytes,
then we will try for a local read.
+ if (readableBytes < shareFetch.fetchParams().maxBytes) {
+ // Get the local log read based topic partitions.
+ LinkedHashMap<TopicIdPartition, SharePartition>
nonRemoteFetchSharePartitions = new LinkedHashMap<>();
+ sharePartitions.forEach((topicIdPartition, sharePartition) -> {
+ if (!partitionsAcquired.containsKey(topicIdPartition) &&
!remoteFetchOpt.get().fetchOffsetMetadataMap().containsKey(topicIdPartition)) {
+ nonRemoteFetchSharePartitions.put(topicIdPartition,
sharePartition);
+ }
+ });
+ nonRemoteFetchTopicPartitionData =
acquirablePartitions(nonRemoteFetchSharePartitions);
+ if (!nonRemoteFetchTopicPartitionData.isEmpty()) {
+ log.trace("Fetchable local share partitions for a remote
share fetch request data: {} with groupId: {} fetch params: {}",
+ nonRemoteFetchTopicPartitionData,
shareFetch.groupId(), shareFetch.fetchParams());
+
+ LinkedHashMap<TopicIdPartition, LogReadResult>
responseData = readFromLog(
+ nonRemoteFetchTopicPartitionData,
+
partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes -
readableBytes, nonRemoteFetchTopicPartitionData.keySet(),
nonRemoteFetchTopicPartitionData.size()));
+ for (Map.Entry<TopicIdPartition, LogReadResult> entry :
responseData.entrySet()) {
+ if
(entry.getValue().info().delayedRemoteStorageFetch.isEmpty()) {
+ shareFetchPartitionData.add(
+ new ShareFetchPartitionData(
+ entry.getKey(),
+
nonRemoteFetchTopicPartitionData.get(entry.getKey()),
+
entry.getValue().toFetchPartitionData(false)
+ )
+ );
+ }
+ }
+ }
+ }
+
+ // Update metric to record acquired to requested partitions.
+ double requestTopicToAcquired = (double)
(partitionsAcquired.size() + nonRemoteFetchTopicPartitionData.size()) /
shareFetch.topicIdPartitions().size();
+ if (requestTopicToAcquired > 0)
+
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int)
(requestTopicToAcquired * 100));
+
+ Map<TopicIdPartition, ShareFetchResponseData.PartitionData>
remoteFetchResponse = ShareFetchUtils.processFetchResponse(
+ shareFetch, shareFetchPartitionData, sharePartitions,
replicaManager, exceptionHandler);
+ shareFetch.maybeComplete(remoteFetchResponse);
+ log.trace("Remote share fetch request completed successfully,
response: {}", remoteFetchResponse);
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ log.error("Error processing delayed share fetch request", e);
+ Set<TopicIdPartition> topicIdPartitions = new
LinkedHashSet<>(partitionsAcquired.keySet());
+
topicIdPartitions.addAll(nonRemoteFetchTopicPartitionData.keySet());
+ handleFetchException(shareFetch, topicIdPartitions, e);
+ } finally {
+ Set<TopicIdPartition> topicIdPartitions = new
LinkedHashSet<>(partitionsAcquired.keySet());
+
topicIdPartitions.addAll(nonRemoteFetchTopicPartitionData.keySet());
+ releasePartitionLocksAndAddToActionQueue(topicIdPartitions);
+ }
+ }
+
+ /**
+ * Cancel the remote storage read task, if it has not been executed yet
and avoid interrupting the task if it is
+ * already running as it may force closing opened/cached resources as
transaction index.
+ * Note - This function should only be called when we know that there is a
remote fetch in-flight/completed.
+ */
+ private void cancelRemoteFetchTask() {
+ boolean cancelled =
remoteFetchOpt.get().remoteFetchTask().cancel(false);
+ if (!cancelled) {
+ log.debug("Remote fetch task for RemoteStorageFetchInfo: {} could
not be cancelled and its isDone value is {}",
+ remoteFetchOpt.get().remoteFetchInfo(),
remoteFetchOpt.get().remoteFetchTask().isDone());
+ }
+ }
+
+ public record RemoteFetch(
+ TopicIdPartition topicIdPartition,
+ Future<Void> remoteFetchTask,
+ CompletableFuture<RemoteLogReadResult> remoteFetchResult,
+ RemoteStorageFetchInfo remoteFetchInfo,
+ LinkedHashMap<TopicIdPartition, LogOffsetMetadata>
fetchOffsetMetadataMap
Review Comment:
we don't need to store `LogOffsetMetadata` now, hence I have removed
`fetchOffsetMetadataMap`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]