Hangleton commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1166846747
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,6 +622,176 @@ public String toString() {
}
}
+ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo)
throws RemoteStorageException, IOException {
+ int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+ TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+ FetchRequest.PartitionData fetchInfo =
remoteStorageFetchInfo.fetchInfo;
+
+ boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation ==
FetchIsolation.TXN_COMMITTED;
+
+ long offset = fetchInfo.fetchOffset;
+ int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+ Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+ OptionalInt epoch = OptionalInt.empty();
+
+ if (logOptional.isPresent()) {
+ Option<LeaderEpochFileCache> leaderEpochCache =
logOptional.get().leaderEpochCache();
+ if (leaderEpochCache.isDefined()) {
+ epoch = leaderEpochCache.get().epochForOffset(offset);
+ }
+ }
+
+ Optional<RemoteLogSegmentMetadata> rlsMetadata = epoch.isPresent()
+ ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+ : Optional.empty();
+
+ if (!rlsMetadata.isPresent()) {
+ String epochStr = (epoch.isPresent()) ?
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+ throw new OffsetOutOfRangeException("Received request for offset "
+ offset + " for leader epoch "
+ + epochStr + " and partition " + tp + " which does not
exist in remote tier.");
+ }
+
+ int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+ InputStream remoteSegInputStream = null;
+ try {
+ // Search forward for the position of the last offset that is
greater than or equal to the target offset
+ remoteSegInputStream =
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+ RemoteLogInputStream remoteLogInputStream = new
RemoteLogInputStream(remoteSegInputStream);
+
+ RecordBatch firstBatch = findFirstBatch(remoteLogInputStream,
offset);
+
+ if (firstBatch == null)
+ return new FetchDataInfo(new LogOffsetMetadata(offset),
MemoryRecords.EMPTY, false,
+ includeAbortedTxns ?
Optional.of(Collections.emptyList()) : Optional.empty());
+
+ int updatedFetchSize =
Review Comment:
Is there a risk here to overflow the `buffer` if it is allocated up to
`maxBytes` and the first batch is greater than that?
This could happen if the partition read here is not the first one from the
Fetch request in which case `minOneMessage` is false [*]. In the local
counterpart, an empty set of records is returned
([source](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1247-L1250),
where `firstEntryIncomplete` describes the case where (using the equivalent
notation used here) `minOneMessage` was false and `firstBatch.sizeInBytes() >
maxBytes`).
[*] Even if only one remote partition is served per Fetch request, there can
still be another partition from that request served from its local replica log
hence a possibility for not at least one message requested to be read here.
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,6 +622,176 @@ public String toString() {
}
}
+ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo)
throws RemoteStorageException, IOException {
+ int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+ TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+ FetchRequest.PartitionData fetchInfo =
remoteStorageFetchInfo.fetchInfo;
+
+ boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation ==
FetchIsolation.TXN_COMMITTED;
+
+ long offset = fetchInfo.fetchOffset;
+ int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+ Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+ OptionalInt epoch = OptionalInt.empty();
+
+ if (logOptional.isPresent()) {
+ Option<LeaderEpochFileCache> leaderEpochCache =
logOptional.get().leaderEpochCache();
+ if (leaderEpochCache.isDefined()) {
+ epoch = leaderEpochCache.get().epochForOffset(offset);
+ }
+ }
+
+ Optional<RemoteLogSegmentMetadata> rlsMetadata = epoch.isPresent()
+ ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+ : Optional.empty();
+
+ if (!rlsMetadata.isPresent()) {
+ String epochStr = (epoch.isPresent()) ?
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+ throw new OffsetOutOfRangeException("Received request for offset "
+ offset + " for leader epoch "
+ + epochStr + " and partition " + tp + " which does not
exist in remote tier.");
+ }
+
+ int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+ InputStream remoteSegInputStream = null;
+ try {
+ // Search forward for the position of the last offset that is
greater than or equal to the target offset
+ remoteSegInputStream =
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+ RemoteLogInputStream remoteLogInputStream = new
RemoteLogInputStream(remoteSegInputStream);
+
+ RecordBatch firstBatch = findFirstBatch(remoteLogInputStream,
offset);
+
+ if (firstBatch == null)
+ return new FetchDataInfo(new LogOffsetMetadata(offset),
MemoryRecords.EMPTY, false,
+ includeAbortedTxns ?
Optional.of(Collections.emptyList()) : Optional.empty());
+
+ int updatedFetchSize =
+ remoteStorageFetchInfo.minOneMessage &&
firstBatch.sizeInBytes() > maxBytes
+ ? firstBatch.sizeInBytes() : maxBytes;
+
+ ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+ int remainingBytes = updatedFetchSize;
+
+ firstBatch.writeTo(buffer);
+ remainingBytes -= firstBatch.sizeInBytes();
+
+ if (remainingBytes > 0) {
+ // input stream is read till (startPos - 1) while getting the
batch of records earlier.
+ // read the input stream until min of (EOF stream or buffer's
remaining capacity).
+ Utils.readFully(remoteSegInputStream, buffer);
+ }
+ buffer.flip();
+
+ FetchDataInfo fetchDataInfo = new FetchDataInfo(new
LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer));
+ if (includeAbortedTxns) {
+ fetchDataInfo =
addAbortedTransactions(firstBatch.baseOffset(), rlsMetadata.get(),
fetchDataInfo);
+ }
+
+ return fetchDataInfo;
+ } finally {
+ Utils.closeQuietly(remoteSegInputStream,
"RemoteLogSegmentInputStream");
+ }
+ }
+
+ private int lookupPositionForOffset(RemoteLogSegmentMetadata
remoteLogSegmentMetadata, long offset) {
+ return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
+ }
+
+ private FetchDataInfo addAbortedTransactions(long startOffset,
+ RemoteLogSegmentMetadata
segmentMetadata,
+ FetchDataInfo fetchInfo) throws
RemoteStorageException {
+ int fetchSize = fetchInfo.records.sizeInBytes();
+ OffsetPosition startOffsetPosition = new
OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+ fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+
+ OffsetIndex offsetIndex =
indexCache.getIndexEntry(segmentMetadata).offsetIndex();
+ long upperBoundOffset =
offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+ .map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
+
+ final List<FetchResponseData.AbortedTransaction> abortedTransactions =
new ArrayList<>();
+
+ Consumer<List<AbortedTxn>> accumulator =
+ abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
+
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
+
+ collectAbortedTransactions(startOffset, upperBoundOffset,
segmentMetadata, accumulator);
+
+ return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
+ fetchInfo.records,
+ fetchInfo.firstEntryIncomplete,
+ Optional.of(abortedTransactions));
+ }
+
+ private void collectAbortedTransactions(long startOffset,
+ long upperBoundOffset,
+ RemoteLogSegmentMetadata segmentMetadata,
+ Consumer<List<AbortedTxn>> accumulator)
throws RemoteStorageException {
+ TopicPartition topicPartition =
segmentMetadata.topicIdPartition().topicPartition();
+ Iterator<LogSegment> localLogSegments = fetchLog.apply(topicPartition)
+ .map(log -> JavaConverters.asJavaCollection(log.logSegments()))
+ .map(Collection::iterator)
+ .orElse(Collections.emptyIterator());
+
+ boolean searchInLocalLog = false;
+ Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt =
Optional.of(segmentMetadata);
+ Optional<TransactionIndex> txnIndexOpt =
nextSegmentMetadataOpt.map(metadata ->
indexCache.getIndexEntry(metadata).txnIndex());
+
+ while (txnIndexOpt.isPresent()) {
+ TxnIndexSearchResult searchResult =
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
+ accumulator.accept(searchResult.abortedTransactions);
+ if (!searchResult.isComplete) {
+ if (!searchInLocalLog) {
+ nextSegmentMetadataOpt =
findNextSegmentMetadata(nextSegmentMetadataOpt.get());
+
+ txnIndexOpt = nextSegmentMetadataOpt.map(x ->
indexCache.getIndexEntry(x).txnIndex());
Review Comment:
There may be an availability risk here. The remote index cache allows up to
1,024 entries by default and it is possible to keep a transaction index entry
permanently rotated in and out of the cache so that retries of Fetch requests
for a partition at a given (constant) offset permanently trigger a download of
the index. This, plus the fact there is a global lock for read and write access
on the remote index cache, can lead to lock contention on the remote log reader
thread pool. In the worst case, it is possible for a partition to be prevented
from making progress on the fetch path.
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1118,9 +1122,13 @@ class ReplicaManager(val config: KafkaConfig,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
): Unit = {
// check if this fetch request can be satisfied right away
- val logReadResults = readFromLocalLog(params, fetchInfos, quota,
readFromPurgatory = false)
+ val logReadResults = readFromLog(params, fetchInfos, quota,
readFromPurgatory = false)
var bytesReadable: Long = 0
var errorReadingData = false
+
+ // The 1st topic-partition that has to be read from remote storage
+ var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()
Review Comment:
Agreed - there are consumption patterns which diverge from the local case
with this approach (that is, uneven progress across the partitions consumed
from a topic [with said partitions of the same nature w.r.t. record batch size
and overall size]).
It may be preferable not to diverge from the local approach and read from
all the remote partitions found in the `fetchInfos`. Then, a different read
pattern which provides greater performance for a specific operational
environment and workload could be enforced via a configuration property.
##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1273,17 +1328,45 @@ class ReplicaManager(val config: KafkaConfig,
_: FencedLeaderEpochException |
_: ReplicaNotAvailableException |
_: KafkaStorageException |
- _: OffsetOutOfRangeException |
_: InconsistentTopicIdException) =>
- LogReadResult(info = new
FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
- divergingEpoch = None,
- highWatermark = UnifiedLog.UnknownOffset,
- leaderLogStartOffset = UnifiedLog.UnknownOffset,
- leaderLogEndOffset = UnifiedLog.UnknownOffset,
- followerLogStartOffset = UnifiedLog.UnknownOffset,
- fetchTimeMs = -1L,
- lastStableOffset = None,
- exception = Some(e))
+ createLogReadResult(e)
+ case e: OffsetOutOfRangeException =>
Review Comment:
I assume eventually we won't use exception-based branch control and not rely
on this exception to be re-directed to the remote read code path?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]