Hangleton commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1166846747


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,6 +622,176 @@ public String toString() {
         }
     }
 
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = 
logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadata = epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadata.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+        }
+
+        int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+        InputStream remoteSegInputStream = null;
+        try {
+            // Search forward for the position of the last offset that is 
greater than or equal to the target offset
+            remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+            RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+            RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+            if (firstBatch == null)
+                return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+                        includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+            int updatedFetchSize =

Review Comment:
   Is there a risk here to overflow the `buffer` if it is allocated up to 
`maxBytes` and the first batch is greater than that?
   
   This could happen if the partition read here is not the first one from the 
Fetch request in which case `minOneMessage` is false [*]. In the local 
counterpart, an empty set of records is returned 
([source](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1247-L1250),
 where `firstEntryIncomplete` describes the case where (using the equivalent 
notation used here) `minOneMessage` was false and `firstBatch.sizeInBytes() > 
maxBytes`).
   
   [*] Even if only one remote partition is served per Fetch request, there can 
still be another partition from that request served from its local replica log 
hence a possibility for not at least one message requested to be read here.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,6 +622,176 @@ public String toString() {
         }
     }
 
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = 
logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadata = epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadata.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+        }
+
+        int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+        InputStream remoteSegInputStream = null;
+        try {
+            // Search forward for the position of the last offset that is 
greater than or equal to the target offset
+            remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+            RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+            RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+            if (firstBatch == null)
+                return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+                        includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+            int updatedFetchSize =
+                    remoteStorageFetchInfo.minOneMessage && 
firstBatch.sizeInBytes() > maxBytes
+                            ? firstBatch.sizeInBytes() : maxBytes;
+
+            ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+            int remainingBytes = updatedFetchSize;
+
+            firstBatch.writeTo(buffer);
+            remainingBytes -= firstBatch.sizeInBytes();
+
+            if (remainingBytes > 0) {
+                // input stream is read till (startPos - 1) while getting the 
batch of records earlier.
+                // read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+                Utils.readFully(remoteSegInputStream, buffer);
+            }
+            buffer.flip();
+
+            FetchDataInfo fetchDataInfo = new FetchDataInfo(new 
LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer));
+            if (includeAbortedTxns) {
+                fetchDataInfo = 
addAbortedTransactions(firstBatch.baseOffset(), rlsMetadata.get(), 
fetchDataInfo);
+            }
+
+            return fetchDataInfo;
+        } finally {
+            Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
+        }
+    }
+
+    private int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+        return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
+    }
+
+    private FetchDataInfo addAbortedTransactions(long startOffset,
+                                         RemoteLogSegmentMetadata 
segmentMetadata,
+                                         FetchDataInfo fetchInfo) throws 
RemoteStorageException {
+        int fetchSize = fetchInfo.records.sizeInBytes();
+        OffsetPosition startOffsetPosition = new 
OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+                fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+
+        OffsetIndex offsetIndex = 
indexCache.getIndexEntry(segmentMetadata).offsetIndex();
+        long upperBoundOffset = 
offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+                .map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
+
+        final List<FetchResponseData.AbortedTransaction> abortedTransactions = 
new ArrayList<>();
+
+        Consumer<List<AbortedTxn>> accumulator =
+                abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
+                        
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
+
+        collectAbortedTransactions(startOffset, upperBoundOffset, 
segmentMetadata, accumulator);
+
+        return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
+                fetchInfo.records,
+                fetchInfo.firstEntryIncomplete,
+                Optional.of(abortedTransactions));
+    }
+
+    private void collectAbortedTransactions(long startOffset,
+                                    long upperBoundOffset,
+                                    RemoteLogSegmentMetadata segmentMetadata,
+                                    Consumer<List<AbortedTxn>> accumulator) 
throws RemoteStorageException {
+        TopicPartition topicPartition = 
segmentMetadata.topicIdPartition().topicPartition();
+        Iterator<LogSegment> localLogSegments = fetchLog.apply(topicPartition)
+                .map(log -> JavaConverters.asJavaCollection(log.logSegments()))
+                .map(Collection::iterator)
+                .orElse(Collections.emptyIterator());
+
+        boolean searchInLocalLog = false;
+        Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = 
Optional.of(segmentMetadata);
+        Optional<TransactionIndex> txnIndexOpt = 
nextSegmentMetadataOpt.map(metadata -> 
indexCache.getIndexEntry(metadata).txnIndex());
+
+        while (txnIndexOpt.isPresent()) {
+            TxnIndexSearchResult searchResult = 
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
+            accumulator.accept(searchResult.abortedTransactions);
+            if (!searchResult.isComplete) {
+                if (!searchInLocalLog) {
+                    nextSegmentMetadataOpt = 
findNextSegmentMetadata(nextSegmentMetadataOpt.get());
+
+                    txnIndexOpt = nextSegmentMetadataOpt.map(x -> 
indexCache.getIndexEntry(x).txnIndex());

Review Comment:
   There may be an availability risk here. The remote index cache allows up to 
1,024 entries by default and it is possible to keep a transaction index entry 
permanently rotated in and out of the cache so that retries of Fetch requests 
for a partition at a given (constant) offset permanently trigger a download of 
the index. This, plus the fact there is a global lock for read and write access 
on the remote index cache, can lead to lock contention on the remote log reader 
thread pool. In the worst case, it is possible for a partition to be prevented 
from making progress on the fetch path.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1118,9 +1122,13 @@ class ReplicaManager(val config: KafkaConfig,
     responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
   ): Unit = {
     // check if this fetch request can be satisfied right away
-    val logReadResults = readFromLocalLog(params, fetchInfos, quota, 
readFromPurgatory = false)
+    val logReadResults = readFromLog(params, fetchInfos, quota, 
readFromPurgatory = false)
     var bytesReadable: Long = 0
     var errorReadingData = false
+
+    // The 1st topic-partition that has to be read from remote storage
+    var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()

Review Comment:
   Agreed - there are consumption patterns which diverge from the local case 
with this approach (that is, uneven progress across the partitions consumed 
from a topic [with said partitions of the same nature w.r.t. record batch size 
and overall size]).
   
   It may be preferable not to diverge from the local approach and read from 
all the remote partitions found in the `fetchInfos`. Then, a different read 
pattern which provides greater performance for a specific operational 
environment and workload could be enforced via a configuration property. 



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1273,17 +1328,45 @@ class ReplicaManager(val config: KafkaConfig,
                  _: FencedLeaderEpochException |
                  _: ReplicaNotAvailableException |
                  _: KafkaStorageException |
-                 _: OffsetOutOfRangeException |
                  _: InconsistentTopicIdException) =>
-          LogReadResult(info = new 
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
-            divergingEpoch = None,
-            highWatermark = UnifiedLog.UnknownOffset,
-            leaderLogStartOffset = UnifiedLog.UnknownOffset,
-            leaderLogEndOffset = UnifiedLog.UnknownOffset,
-            followerLogStartOffset = UnifiedLog.UnknownOffset,
-            fetchTimeMs = -1L,
-            lastStableOffset = None,
-            exception = Some(e))
+          createLogReadResult(e)
+        case e: OffsetOutOfRangeException =>

Review Comment:
   I assume eventually we won't use exception-based branch control and not rely 
on this exception to be re-directed to the remote read code path?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to