[GitHub] [kafka] jeqo commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-10 Thread via GitHub


jeqo commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1189509107


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,25 +623,204 @@ public String toString() {
 }
 }
 
-long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-Optional offset = Optional.empty();
-Optional maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-if (maybeLog.isPresent()) {
-UnifiedLog log = maybeLog.get();
-Option maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-if (maybeLeaderEpochFileCache.isDefined()) {
-LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-OptionalInt epoch = cache.latestEpoch();
-while (!offset.isPresent() && epoch.isPresent()) {
-offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-epoch = cache.previousEpoch(epoch.getAsInt());
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadataOptional = 
epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadataOptional.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
+int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+int firstBatchSize = firstBatch.sizeInBytes();
+// An empty record is sent instead of an incomplete batch when
+//  - there is no minimum-one-message constraint and
+//  - the first batch size is more than maximum bytes that can be 
sent and
+//  - for FetchRequest version 3 or above.
+if (!remoteStorageFetchInfo.minOneMessage &&
+!remoteStorageFetchInfo.hardMaxBytesLimit &&
+firstBatchSize > maxBytes) {
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY);
+}
+
+int updatedFetchSize =
+remoteStorageFetchInfo.minOneMessage && firstBatchSize > 
maxBytes ? firstBatchSize : maxBytes;
+
+ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+int remainingBytes = updatedFetchSize;
+
+firstBatch.writeTo(buffer);
+remainingBytes -= firstBatchSize;
+
+if (remainingBytes > 0) {
+// read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+Utils.readFully(remoteSegInputStream, buffer);
+}
+buffer.flip();
+
+FetchDataInfo fetchDataInfo = new FetchDataInfo(
+new LogOffsetMetadata(offset, 
remoteLogSegmentMetadata.startOffset(), startPos),
+MemoryRecords.readableRecords(buffer));
+  

[GitHub] [kafka] jeqo commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-02 Thread via GitHub


jeqo commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1183258537


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadata = epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadata.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);

Review Comment:
   Sure, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-02 Thread via GitHub


jeqo commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1183258439


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1118,9 +1122,13 @@ class ReplicaManager(val config: KafkaConfig,
 responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
   ): Unit = {
 // check if this fetch request can be satisfied right away
-val logReadResults = readFromLocalLog(params, fetchInfos, quota, 
readFromPurgatory = false)
+val logReadResults = readFromLog(params, fetchInfos, quota, 
readFromPurgatory = false)
 var bytesReadable: Long = 0
 var errorReadingData = false
+
+// The 1st topic-partition that has to be read from remote storage
+var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()

Review Comment:
   Sure, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-14 Thread via GitHub


jeqo commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1166318900


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadata = epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadata.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);

Review Comment:
   Would be possible sending the endOffset as well? Without it, input stream 
will potentially contain the whole log and not be consumed til the end.
   In the case of S3, when inputstream is not consumed til the end HTTP 
connection is aborted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-13 Thread via GitHub


jeqo commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1165444921


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1118,9 +1122,13 @@ class ReplicaManager(val config: KafkaConfig,
 responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
   ): Unit = {
 // check if this fetch request can be satisfied right away
-val logReadResults = readFromLocalLog(params, fetchInfos, quota, 
readFromPurgatory = false)
+val logReadResults = readFromLog(params, fetchInfos, quota, 
readFromPurgatory = false)
 var bytesReadable: Long = 0
 var errorReadingData = false
+
+// The 1st topic-partition that has to be read from remote storage
+var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()

Review Comment:
   I understand a new PR will come to overcome this, but could we provide 
further context (on the source code or PR) about the implications of using the 
first topic-partition only?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org