junrao commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1164654776


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig,
           fetchPartitionStatus += (topicIdPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
         })
       }
-      val delayedFetch = new DelayedFetch(
-        params = params,
-        fetchPartitionStatus = fetchPartitionStatus,
-        replicaManager = this,
-        quota = quota,
-        responseCallback = responseCallback
-      )
-
-      // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
-      val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => 
TopicPartitionOperationKey(tp) }
-
-      // try to complete the request immediately, otherwise put it into the 
purgatory;
-      // this is because while the delayed fetch operation is being created, 
new requests
-      // may arrive and hence make this operation completable.
-      delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, 
delayedFetchKeys)
+
+      if (remoteFetchInfo.isPresent) {

Review Comment:
   In line 1082, we should further test `!remoteFetchInfo.isPresent`, right?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,6 +622,176 @@ public String toString() {
         }
     }
 
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = 
logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadata = epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadata.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+        }
+
+        int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);

Review Comment:
   We are calling `rlsMetadata.get()` multiple times. We could get it once and 
reuse the result?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,6 +622,176 @@ public String toString() {
         }
     }
 
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = 
logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadata = epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadata.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+        }
+
+        int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+        InputStream remoteSegInputStream = null;
+        try {
+            // Search forward for the position of the last offset that is 
greater than or equal to the target offset
+            remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+            RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+            RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+            if (firstBatch == null)
+                return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+                        includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+            int updatedFetchSize =
+                    remoteStorageFetchInfo.minOneMessage && 
firstBatch.sizeInBytes() > maxBytes
+                            ? firstBatch.sizeInBytes() : maxBytes;
+
+            ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+            int remainingBytes = updatedFetchSize;
+
+            firstBatch.writeTo(buffer);
+            remainingBytes -= firstBatch.sizeInBytes();
+
+            if (remainingBytes > 0) {
+                // input stream is read till (startPos - 1) while getting the 
batch of records earlier.
+                // read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+                Utils.readFully(remoteSegInputStream, buffer);
+            }
+            buffer.flip();
+
+            FetchDataInfo fetchDataInfo = new FetchDataInfo(new 
LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer));
+            if (includeAbortedTxns) {
+                fetchDataInfo = 
addAbortedTransactions(firstBatch.baseOffset(), rlsMetadata.get(), 
fetchDataInfo);
+            }
+
+            return fetchDataInfo;
+        } finally {
+            Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
+        }
+    }
+
+    private int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+        return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
+    }
+
+    private FetchDataInfo addAbortedTransactions(long startOffset,
+                                         RemoteLogSegmentMetadata 
segmentMetadata,
+                                         FetchDataInfo fetchInfo) throws 
RemoteStorageException {
+        int fetchSize = fetchInfo.records.sizeInBytes();
+        OffsetPosition startOffsetPosition = new 
OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+                fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+
+        OffsetIndex offsetIndex = 
indexCache.getIndexEntry(segmentMetadata).offsetIndex();
+        long upperBoundOffset = 
offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+                .map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
+
+        final List<FetchResponseData.AbortedTransaction> abortedTransactions = 
new ArrayList<>();
+
+        Consumer<List<AbortedTxn>> accumulator =
+                abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
+                        
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
+
+        collectAbortedTransactions(startOffset, upperBoundOffset, 
segmentMetadata, accumulator);
+
+        return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
+                fetchInfo.records,
+                fetchInfo.firstEntryIncomplete,
+                Optional.of(abortedTransactions));
+    }
+
+    private void collectAbortedTransactions(long startOffset,
+                                    long upperBoundOffset,
+                                    RemoteLogSegmentMetadata segmentMetadata,
+                                    Consumer<List<AbortedTxn>> accumulator) 
throws RemoteStorageException {
+        TopicPartition topicPartition = 
segmentMetadata.topicIdPartition().topicPartition();
+        Iterator<LogSegment> localLogSegments = fetchLog.apply(topicPartition)

Review Comment:
   We are calling `fetchLog.apply` multiple time when serving a single fetch 
request. Could we call it once and reuse the result?



##########
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+
+public class RemoteLogReader implements Callable<Void> {
+

Review Comment:
   extra new line



##########
core/src/main/scala/kafka/server/DelayedRemoteFetch.scala:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.log.remote.RemoteLogReadResult
+import org.apache.kafka.common.TopicIdPartition
+import org.apache.kafka.common.errors._
+import org.apache.kafka.storage.internals.log.{FetchParams, 
FetchPartitionData, LogOffsetMetadata, RemoteStorageFetchInfo}
+
+import java.util.concurrent.{CompletableFuture, Future}
+import java.util.{Optional, OptionalInt, OptionalLong}
+import scala.collection._
+
+/**
+ * A remote fetch operation that can be created by the replica manager and 
watched
+ * in the remote fetch operation purgatory
+ */
+class DelayedRemoteFetch(remoteFetchTask: Future[Void],
+                         remoteFetchResult: 
CompletableFuture[RemoteLogReadResult],
+                         remoteFetchInfo: RemoteStorageFetchInfo,
+                         fetchPartitionStatus: Seq[(TopicIdPartition, 
FetchPartitionStatus)],
+                         fetchParams: FetchParams,
+                         localReadResults: Seq[(TopicIdPartition, 
LogReadResult)],
+                         replicaManager: ReplicaManager,
+                         quota: ReplicaQuota,
+                         responseCallback: Seq[(TopicIdPartition, 
FetchPartitionData)] => Unit)
+  extends DelayedOperation(fetchParams.maxWaitMs) {
+
+  /**
+   * The operation can be completed if:
+   *
+   * Case a: This broker is no longer the leader of the partition it tries to 
fetch
+   * Case b: This broker does not know the partition it tries to fetch
+   * Case c: The remote storage read request completed (succeeded or failed)
+   * Case d: The partition is in an offline log directory on this broker
+   * Case e: This broker is the leader, but the requested epoch is now fenced
+   *
+   * Upon completion, should return whatever data is available for each valid 
partition
+   */
+  override def tryComplete(): Boolean = {
+    fetchPartitionStatus.foreach {
+      case (topicPartition, fetchStatus) =>
+        val fetchOffset = fetchStatus.startOffsetMetadata
+        val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
+        try {
+          if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
+            
replicaManager.getPartitionOrException(topicPartition.topicPartition())
+          }
+        } catch {
+          case _: KafkaStorageException => // Case d
+            debug(s"Partition $topicPartition is in an offline log directory, 
satisfy $fetchParams immediately")
+            return forceComplete()
+          case _: UnknownTopicOrPartitionException => // Case b
+            debug(s"Broker no longer knows of partition $topicPartition, 
satisfy $fetchParams immediately")
+            return forceComplete()
+          case _: FencedLeaderEpochException => // Case e

Review Comment:
   Hmm, `replicaManager.getPartitionOrException` doesn't seem to check the 
epoch?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,6 +622,176 @@ public String toString() {
         }
     }
 
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = 
logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadata = epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadata.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+        }
+
+        int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+        InputStream remoteSegInputStream = null;
+        try {
+            // Search forward for the position of the last offset that is 
greater than or equal to the target offset
+            remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+            RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+            RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+            if (firstBatch == null)
+                return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+                        includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+            int updatedFetchSize =
+                    remoteStorageFetchInfo.minOneMessage && 
firstBatch.sizeInBytes() > maxBytes
+                            ? firstBatch.sizeInBytes() : maxBytes;
+
+            ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+            int remainingBytes = updatedFetchSize;
+
+            firstBatch.writeTo(buffer);
+            remainingBytes -= firstBatch.sizeInBytes();
+
+            if (remainingBytes > 0) {
+                // input stream is read till (startPos - 1) while getting the 
batch of records earlier.
+                // read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+                Utils.readFully(remoteSegInputStream, buffer);
+            }
+            buffer.flip();
+
+            FetchDataInfo fetchDataInfo = new FetchDataInfo(new 
LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer));

Review Comment:
   Hmm, `LogOffsetMetadata.position` is important for calculating 
`upperBoundOffset` in `addAbortedTransactions()`. So, we need to set it instead 
of relying on the default, right?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,6 +622,176 @@ public String toString() {
         }
     }
 
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();

Review Comment:
   Could we just use `fetchInfo.lastFetchedEpoch`?



##########
core/src/main/scala/kafka/server/DelayedRemoteFetch.scala:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.log.remote.RemoteLogReadResult
+import org.apache.kafka.common.TopicIdPartition
+import org.apache.kafka.common.errors._
+import org.apache.kafka.storage.internals.log.{FetchParams, 
FetchPartitionData, LogOffsetMetadata, RemoteStorageFetchInfo}
+
+import java.util.concurrent.{CompletableFuture, Future}
+import java.util.{Optional, OptionalInt, OptionalLong}
+import scala.collection._
+
+/**
+ * A remote fetch operation that can be created by the replica manager and 
watched
+ * in the remote fetch operation purgatory
+ */
+class DelayedRemoteFetch(remoteFetchTask: Future[Void],
+                         remoteFetchResult: 
CompletableFuture[RemoteLogReadResult],
+                         remoteFetchInfo: RemoteStorageFetchInfo,
+                         fetchPartitionStatus: Seq[(TopicIdPartition, 
FetchPartitionStatus)],
+                         fetchParams: FetchParams,
+                         localReadResults: Seq[(TopicIdPartition, 
LogReadResult)],
+                         replicaManager: ReplicaManager,
+                         quota: ReplicaQuota,

Review Comment:
   quota seems unused.



##########
core/src/main/java/kafka/log/remote/RemoteLogReadResult.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+
+import java.util.Optional;
+
+public class RemoteLogReadResult {
+

Review Comment:
   extra new line



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1243,6 +1327,33 @@ class ReplicaManager(val config: KafkaConfig,
     result
   }
 
+  def createLogReadResult(highWatermark: Long,

Review Comment:
   Could this be private? Ditto below.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,6 +622,176 @@ public String toString() {
         }
     }
 
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = 
logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadata = epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadata.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+        }
+
+        int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+        InputStream remoteSegInputStream = null;
+        try {
+            // Search forward for the position of the last offset that is 
greater than or equal to the target offset
+            remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+            RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+            RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+            if (firstBatch == null)
+                return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+                        includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+            int updatedFetchSize =
+                    remoteStorageFetchInfo.minOneMessage && 
firstBatch.sizeInBytes() > maxBytes
+                            ? firstBatch.sizeInBytes() : maxBytes;
+
+            ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+            int remainingBytes = updatedFetchSize;
+
+            firstBatch.writeTo(buffer);
+            remainingBytes -= firstBatch.sizeInBytes();
+
+            if (remainingBytes > 0) {
+                // input stream is read till (startPos - 1) while getting the 
batch of records earlier.

Review Comment:
   Hmm, not sure that I understand the comment. What is startPos?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,6 +622,176 @@ public String toString() {
         }
     }
 
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = 
logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadata = epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadata.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+        }
+
+        int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+        InputStream remoteSegInputStream = null;
+        try {
+            // Search forward for the position of the last offset that is 
greater than or equal to the target offset
+            remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+            RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+            RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+            if (firstBatch == null)
+                return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+                        includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+            int updatedFetchSize =
+                    remoteStorageFetchInfo.minOneMessage && 
firstBatch.sizeInBytes() > maxBytes
+                            ? firstBatch.sizeInBytes() : maxBytes;
+
+            ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+            int remainingBytes = updatedFetchSize;
+
+            firstBatch.writeTo(buffer);
+            remainingBytes -= firstBatch.sizeInBytes();
+
+            if (remainingBytes > 0) {
+                // input stream is read till (startPos - 1) while getting the 
batch of records earlier.
+                // read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+                Utils.readFully(remoteSegInputStream, buffer);
+            }
+            buffer.flip();
+
+            FetchDataInfo fetchDataInfo = new FetchDataInfo(new 
LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer));
+            if (includeAbortedTxns) {
+                fetchDataInfo = 
addAbortedTransactions(firstBatch.baseOffset(), rlsMetadata.get(), 
fetchDataInfo);
+            }
+
+            return fetchDataInfo;
+        } finally {
+            Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
+        }
+    }
+
+    private int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+        return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
+    }
+
+    private FetchDataInfo addAbortedTransactions(long startOffset,
+                                         RemoteLogSegmentMetadata 
segmentMetadata,
+                                         FetchDataInfo fetchInfo) throws 
RemoteStorageException {
+        int fetchSize = fetchInfo.records.sizeInBytes();
+        OffsetPosition startOffsetPosition = new 
OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+                fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+
+        OffsetIndex offsetIndex = 
indexCache.getIndexEntry(segmentMetadata).offsetIndex();
+        long upperBoundOffset = 
offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+                .map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
+
+        final List<FetchResponseData.AbortedTransaction> abortedTransactions = 
new ArrayList<>();
+
+        Consumer<List<AbortedTxn>> accumulator =
+                abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
+                        
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
+
+        collectAbortedTransactions(startOffset, upperBoundOffset, 
segmentMetadata, accumulator);
+
+        return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
+                fetchInfo.records,
+                fetchInfo.firstEntryIncomplete,
+                Optional.of(abortedTransactions));
+    }
+
+    private void collectAbortedTransactions(long startOffset,
+                                    long upperBoundOffset,
+                                    RemoteLogSegmentMetadata segmentMetadata,
+                                    Consumer<List<AbortedTxn>> accumulator) 
throws RemoteStorageException {
+        TopicPartition topicPartition = 
segmentMetadata.topicIdPartition().topicPartition();
+        Iterator<LogSegment> localLogSegments = fetchLog.apply(topicPartition)
+                .map(log -> JavaConverters.asJavaCollection(log.logSegments()))
+                .map(Collection::iterator)
+                .orElse(Collections.emptyIterator());
+
+        boolean searchInLocalLog = false;
+        Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = 
Optional.of(segmentMetadata);
+        Optional<TransactionIndex> txnIndexOpt = 
nextSegmentMetadataOpt.map(metadata -> 
indexCache.getIndexEntry(metadata).txnIndex());
+
+        while (txnIndexOpt.isPresent()) {
+            TxnIndexSearchResult searchResult = 
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
+            accumulator.accept(searchResult.abortedTransactions);
+            if (!searchResult.isComplete) {
+                if (!searchInLocalLog) {
+                    nextSegmentMetadataOpt = 
findNextSegmentMetadata(nextSegmentMetadataOpt.get());
+
+                    txnIndexOpt = nextSegmentMetadataOpt.map(x -> 
indexCache.getIndexEntry(x).txnIndex());
+                    if (!txnIndexOpt.isPresent()) {
+                        searchInLocalLog = true;
+                    }
+                }
+
+                if (searchInLocalLog) {
+                    txnIndexOpt = (localLogSegments.hasNext()) ? 
Optional.of(localLogSegments.next().txnIndex()) : Optional.empty();

Review Comment:
   localLogSegments could overlap with the remote segment. So, we could 
introduce duplicate aborted txn records here.



##########
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+
+public class RemoteLogReader implements Callable<Void> {
+
+    private final Logger logger;
+    private final RemoteStorageFetchInfo fetchInfo;
+    private final RemoteLogManager rlm;
+    private final Consumer<RemoteLogReadResult> callback;
+
+    public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
+                           RemoteLogManager rlm,
+                           Consumer<RemoteLogReadResult> callback) {
+        this.fetchInfo = fetchInfo;
+        this.rlm = rlm;
+        this.callback = callback;
+        logger = new LogContext() {
+            @Override
+            public String logPrefix() {
+                return "[" + Thread.currentThread().getName() + "]";
+            }
+        }.logger(RemoteLogReader.class);
+    }
+
+    @Override
+    public Void call() {
+        RemoteLogReadResult result;
+        try {
+            logger.debug("Reading records from remote storage for topic 
partition {}", fetchInfo.topicPartition);
+
+            FetchDataInfo fetchDataInfo = rlm.read(fetchInfo);
+            result = new RemoteLogReadResult(Optional.of(fetchDataInfo), 
Optional.empty());
+        } catch (OffsetOutOfRangeException e) {
+            result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
+        } catch (Exception e) {
+            logger.error("Error occurred while reading the remote data for 
{}", fetchInfo.topicPartition, e);
+            result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
+        }
+
+        logger.debug("Finished reading records from remote storage for topic 
partition {}", fetchInfo.topicPartition);

Review Comment:
   Should we log fetchInfo too?



##########
core/src/main/java/kafka/log/remote/RemoteLogReadResult.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+
+import java.util.Optional;
+
+public class RemoteLogReadResult {
+
+    public final Optional<FetchDataInfo> fetchDataInfo;
+    public final Optional<Throwable> error;
+
+    RemoteLogReadResult(Optional<FetchDataInfo> fetchDataInfo, 
Optional<Throwable> error) {
+        this.fetchDataInfo = fetchDataInfo;
+        this.error = error;
+    }
+

Review Comment:
   extra new line



##########
core/src/main/scala/kafka/server/DelayedRemoteFetch.scala:
##########
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.log.remote.RemoteLogReadResult
+import org.apache.kafka.common.TopicIdPartition
+import org.apache.kafka.common.errors._
+import org.apache.kafka.storage.internals.log.{FetchParams, 
FetchPartitionData, LogOffsetMetadata, RemoteStorageFetchInfo}
+
+import java.util.concurrent.{CompletableFuture, Future}
+import java.util.{Optional, OptionalInt, OptionalLong}
+import scala.collection._
+
+/**
+ * A remote fetch operation that can be created by the replica manager and 
watched
+ * in the remote fetch operation purgatory
+ */
+class DelayedRemoteFetch(remoteFetchTask: Future[Void],
+                         remoteFetchResult: 
CompletableFuture[RemoteLogReadResult],
+                         remoteFetchInfo: RemoteStorageFetchInfo,
+                         fetchPartitionStatus: Seq[(TopicIdPartition, 
FetchPartitionStatus)],
+                         fetchParams: FetchParams,
+                         localReadResults: Seq[(TopicIdPartition, 
LogReadResult)],
+                         replicaManager: ReplicaManager,
+                         quota: ReplicaQuota,
+                         responseCallback: Seq[(TopicIdPartition, 
FetchPartitionData)] => Unit)
+  extends DelayedOperation(fetchParams.maxWaitMs) {
+
+  /**
+   * The operation can be completed if:
+   *
+   * Case a: This broker is no longer the leader of the partition it tries to 
fetch
+   * Case b: This broker does not know the partition it tries to fetch
+   * Case c: The remote storage read request completed (succeeded or failed)
+   * Case d: The partition is in an offline log directory on this broker
+   * Case e: This broker is the leader, but the requested epoch is now fenced
+   *
+   * Upon completion, should return whatever data is available for each valid 
partition
+   */
+  override def tryComplete(): Boolean = {
+    fetchPartitionStatus.foreach {
+      case (topicPartition, fetchStatus) =>
+        val fetchOffset = fetchStatus.startOffsetMetadata
+        val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
+        try {
+          if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
+            
replicaManager.getPartitionOrException(topicPartition.topicPartition())
+          }
+        } catch {
+          case _: KafkaStorageException => // Case d
+            debug(s"Partition $topicPartition is in an offline log directory, 
satisfy $fetchParams immediately")
+            return forceComplete()
+          case _: UnknownTopicOrPartitionException => // Case b
+            debug(s"Broker no longer knows of partition $topicPartition, 
satisfy $fetchParams immediately")
+            return forceComplete()
+          case _: FencedLeaderEpochException => // Case e
+            debug(s"Broker is the leader of partition $topicPartition, but the 
requested epoch " +
+              s"$fetchLeaderEpoch is fenced by the latest leader epoch, 
satisfy $fetchParams immediately")
+            return forceComplete()
+          case _: NotLeaderOrFollowerException =>  // Case a

Review Comment:
   Should we add follower in the comment and the log below for case a?



##########
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;

Review Comment:
   Could RemoteLogReader, RemoteLogReadResult and RemoteStorageThreadPool all 
be in the storage module?



##########
core/src/main/java/kafka/log/remote/RemoteStorageThreadPool.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.internals.FatalExitError;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RemoteStorageThreadPool extends ThreadPoolExecutor {
+    private final Logger logger;
+
+    public RemoteStorageThreadPool(String name,
+                                   String threadNamePrefix,
+                                   int numThreads,
+                                   int maxPendingTasks,
+                                   Time time) {

Review Comment:
   unused param



##########
core/src/main/java/kafka/log/remote/RemoteStorageThreadPool.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.internals.FatalExitError;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RemoteStorageThreadPool extends ThreadPoolExecutor {
+    private final Logger logger;
+
+    public RemoteStorageThreadPool(String name,

Review Comment:
   unused param



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to