[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-11 Thread via GitHub


Hangleton commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1190991070


##
core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteLogReaderTest {

Review Comment:
   Should we add a test for this use case:
   ```
   minOneMessage = false
   hardMaxBytesLimit = false
   firstBatchSize > maxBytes?
   ```
   or a combination thereof?
   
   Maybe, please let me know if I can help with a few unit tests. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-09 Thread via GitHub


Hangleton commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1188387671


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,25 +623,210 @@ public String toString() {
 }
 }
 
-long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-Optional offset = Optional.empty();
-Optional maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-if (maybeLog.isPresent()) {
-UnifiedLog log = maybeLog.get();
-Option maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-if (maybeLeaderEpochFileCache.isDefined()) {
-LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-OptionalInt epoch = cache.latestEpoch();
-while (!offset.isPresent() && epoch.isPresent()) {
-offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-epoch = cache.previousEpoch(epoch.getAsInt());
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadataOptional = 
epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadataOptional.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
+int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+int firstBatchSize = firstBatch.sizeInBytes();
+// An empty record is sent instead of an incomplete batch when
+//  - there is no minimum-one-message constraint and
+//  - the first batch size is more than maximum bytes that can be 
sent.
+//  - for FetchRequest version 3 or above and
+if (!remoteStorageFetchInfo.minOneMessage &&
+!remoteStorageFetchInfo.hardMaxBytesLimit &&
+firstBatchSize > maxBytes) {
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY);
+}
+
+int updatedFetchSize =
+remoteStorageFetchInfo.minOneMessage && firstBatchSize > 
maxBytes ? firstBatchSize : maxBytes;
+
+ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+int remainingBytes = updatedFetchSize;
+
+firstBatch.writeTo(buffer);
+remainingBytes -= firstBatchSize;
+
+if (remainingBytes > 0) {
+// read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+Utils.readFully(remoteSegInputStream, buffer);
+}
+buffer.flip();
+
+FetchDataInfo fetchDataInfo = new FetchDataInfo(
+new LogOffsetMetadata(offset, 
remoteLogSegmentMetadata.startOffset(), startPos),
+MemoryRecords.readableRecords(buffer));
+ 

[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-09 Thread via GitHub


Hangleton commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1188387671


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,25 +623,210 @@ public String toString() {
 }
 }
 
-long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-Optional offset = Optional.empty();
-Optional maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-if (maybeLog.isPresent()) {
-UnifiedLog log = maybeLog.get();
-Option maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-if (maybeLeaderEpochFileCache.isDefined()) {
-LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-OptionalInt epoch = cache.latestEpoch();
-while (!offset.isPresent() && epoch.isPresent()) {
-offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-epoch = cache.previousEpoch(epoch.getAsInt());
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadataOptional = 
epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadataOptional.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlsMetadataOptional.get();
+int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, 
offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+int firstBatchSize = firstBatch.sizeInBytes();
+// An empty record is sent instead of an incomplete batch when
+//  - there is no minimum-one-message constraint and
+//  - the first batch size is more than maximum bytes that can be 
sent.
+//  - for FetchRequest version 3 or above and
+if (!remoteStorageFetchInfo.minOneMessage &&
+!remoteStorageFetchInfo.hardMaxBytesLimit &&
+firstBatchSize > maxBytes) {
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY);
+}
+
+int updatedFetchSize =
+remoteStorageFetchInfo.minOneMessage && firstBatchSize > 
maxBytes ? firstBatchSize : maxBytes;
+
+ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+int remainingBytes = updatedFetchSize;
+
+firstBatch.writeTo(buffer);
+remainingBytes -= firstBatchSize;
+
+if (remainingBytes > 0) {
+// read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+Utils.readFully(remoteSegInputStream, buffer);
+}
+buffer.flip();
+
+FetchDataInfo fetchDataInfo = new FetchDataInfo(
+new LogOffsetMetadata(offset, 
remoteLogSegmentMetadata.startOffset(), startPos),
+MemoryRecords.readableRecords(buffer));
+ 

[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-03 Thread via GitHub


Hangleton commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1183397975


##
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+
+public class RemoteLogReader implements Callable {
+
+private final Logger logger;
+private final RemoteStorageFetchInfo fetchInfo;
+private final RemoteLogManager rlm;
+private final Consumer callback;
+
+public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
+   RemoteLogManager rlm,
+   Consumer callback) {
+this.fetchInfo = fetchInfo;
+this.rlm = rlm;
+this.callback = callback;
+logger = new LogContext() {
+@Override
+public String logPrefix() {
+return "[" + Thread.currentThread().getName() + "]";
+}
+}.logger(RemoteLogReader.class);
+}
+
+@Override
+public Void call() {
+RemoteLogReadResult result;
+try {
+logger.debug("Reading records from remote storage for topic 
partition {}", fetchInfo.topicPartition);
+
+FetchDataInfo fetchDataInfo = rlm.read(fetchInfo);
+result = new RemoteLogReadResult(Optional.of(fetchDataInfo), 
Optional.empty());
+} catch (OffsetOutOfRangeException e) {
+result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
+} catch (Exception e) {
+logger.error("Error occurred while reading the remote data for 
{}", fetchInfo.topicPartition, e);
+result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
+}
+
+logger.debug("Finished reading records from remote storage for topic 
partition {}", fetchInfo.topicPartition);

Review Comment:
   Should we report of offset out of range when applicable?



##
core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static 

[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-02 Thread via GitHub


Hangleton commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1182493022


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1160,48 +1171,100 @@ class ReplicaManager(val config: KafkaConfig,
   fetchPartitionStatus += (topicIdPartition -> 
FetchPartitionStatus(logOffsetMetadata, partitionData))
 })
   }
-  val delayedFetch = new DelayedFetch(
-params = params,
-fetchPartitionStatus = fetchPartitionStatus,
-replicaManager = this,
-quota = quota,
-responseCallback = responseCallback
-  )
-
-  // create a list of (topic, partition) pairs to use as keys for this 
delayed fetch operation
-  val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => 
TopicPartitionOperationKey(tp) }
-
-  // try to complete the request immediately, otherwise put it into the 
purgatory;
-  // this is because while the delayed fetch operation is being created, 
new requests
-  // may arrive and hence make this operation completable.
-  delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, 
delayedFetchKeys)
+
+  if (remoteFetchInfo.isPresent) {
+val key = new 
TopicPartitionOperationKey(remoteFetchInfo.get.topicPartition.topic(), 
remoteFetchInfo.get.topicPartition.partition())
+val remoteFetchResult = new CompletableFuture[RemoteLogReadResult]
+var remoteFetchTask: Future[Void] = null
+try {
+  remoteFetchTask = 
remoteLogManager.get.asyncRead(remoteFetchInfo.get, (result: 
RemoteLogReadResult) => {
+remoteFetchResult.complete(result)
+delayedRemoteFetchPurgatory.checkAndComplete(key)
+  })
+} catch {
+  // if the task queue of remote storage reader thread pool is full, 
return what we currently have
+  // (the data read from local log segment for the other 
topic-partitions) and an error for the topic-partition that
+  // we couldn't read from remote storage
+  case e: RejectedExecutionException =>
+val fetchPartitionData = logReadResults.map { case (tp, result) =>
+  val r = {
+if 
(tp.topicPartition().equals(remoteFetchInfo.get.topicPartition))
+  createLogReadResult(e)

Review Comment:
   Should we add a log if the nature of the error is not propagated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-05-02 Thread via GitHub


Hangleton commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1180448490


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,25 +622,208 @@ public String toString() {
 }
 }
 
-long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws 
RemoteStorageException {
-Optional offset = Optional.empty();
-Optional maybeLog = 
fetchLog.apply(topicIdPartition.topicPartition());
-if (maybeLog.isPresent()) {
-UnifiedLog log = maybeLog.get();
-Option maybeLeaderEpochFileCache = 
log.leaderEpochCache();
-if (maybeLeaderEpochFileCache.isDefined()) {
-LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-OptionalInt epoch = cache.latestEpoch();
-while (!offset.isPresent() && epoch.isPresent()) {
-offset = 
remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, 
epoch.getAsInt());
-epoch = cache.previousEpoch(epoch.getAsInt());
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadataOptional = 
epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadataOptional.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";

Review Comment:
   nit: instead of `NOT_AVAILABLE`, maybe the message could report that the log 
start offset is strictly greater than the fetch offset?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1118,9 +1122,13 @@ class ReplicaManager(val config: KafkaConfig,
 responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
   ): Unit = {
 // check if this fetch request can be satisfied right away
-val logReadResults = readFromLocalLog(params, fetchInfos, quota, 
readFromPurgatory = false)
+val logReadResults = readFromLog(params, fetchInfos, quota, 
readFromPurgatory = false)
 var bytesReadable: Long = 0
 var errorReadingData = false
+
+// The 1st topic-partition that has to be read from remote storage
+var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()

Review Comment:
   Got it, thanks.



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadata = epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadata.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);

[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-14 Thread via GitHub


Hangleton commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1166887367


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadata = epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadata.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+int updatedFetchSize =
+remoteStorageFetchInfo.minOneMessage && 
firstBatch.sizeInBytes() > maxBytes
+? firstBatch.sizeInBytes() : maxBytes;
+
+ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+int remainingBytes = updatedFetchSize;
+
+firstBatch.writeTo(buffer);
+remainingBytes -= firstBatch.sizeInBytes();
+
+if (remainingBytes > 0) {
+// input stream is read till (startPos - 1) while getting the 
batch of records earlier.
+// read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+Utils.readFully(remoteSegInputStream, buffer);
+}
+buffer.flip();
+
+FetchDataInfo fetchDataInfo = new FetchDataInfo(new 
LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer));
+if (includeAbortedTxns) {
+fetchDataInfo = 
addAbortedTransactions(firstBatch.baseOffset(), rlsMetadata.get(), 
fetchDataInfo);
+}
+
+return fetchDataInfo;
+} finally {
+Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
+}
+}
+
+private int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
+}
+
+private FetchDataInfo addAbortedTransactions(long startOffset,
+ RemoteLogSegmentMetadata 
segmentMetadata,
+ FetchDataInfo fetchInfo) throws 
RemoteStorageException {
+int fetchSize = fetchInfo.records.sizeInBytes();
+OffsetPosition startOffsetPosition = new 
OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+
+OffsetIndex offsetIndex = 
indexCache.getIndexEntry(segmentMetadata).offsetIndex();
+long upperBoundOffset = 
offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+.map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
+
+final List abortedTransactions = 
new ArrayList<>();
+
+Consumer> accumulator =
+abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
+

[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-14 Thread via GitHub


Hangleton commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1166887367


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadata = epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadata.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+int updatedFetchSize =
+remoteStorageFetchInfo.minOneMessage && 
firstBatch.sizeInBytes() > maxBytes
+? firstBatch.sizeInBytes() : maxBytes;
+
+ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+int remainingBytes = updatedFetchSize;
+
+firstBatch.writeTo(buffer);
+remainingBytes -= firstBatch.sizeInBytes();
+
+if (remainingBytes > 0) {
+// input stream is read till (startPos - 1) while getting the 
batch of records earlier.
+// read the input stream until min of (EOF stream or buffer's 
remaining capacity).
+Utils.readFully(remoteSegInputStream, buffer);
+}
+buffer.flip();
+
+FetchDataInfo fetchDataInfo = new FetchDataInfo(new 
LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer));
+if (includeAbortedTxns) {
+fetchDataInfo = 
addAbortedTransactions(firstBatch.baseOffset(), rlsMetadata.get(), 
fetchDataInfo);
+}
+
+return fetchDataInfo;
+} finally {
+Utils.closeQuietly(remoteSegInputStream, 
"RemoteLogSegmentInputStream");
+}
+}
+
+private int lookupPositionForOffset(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata, long offset) {
+return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
+}
+
+private FetchDataInfo addAbortedTransactions(long startOffset,
+ RemoteLogSegmentMetadata 
segmentMetadata,
+ FetchDataInfo fetchInfo) throws 
RemoteStorageException {
+int fetchSize = fetchInfo.records.sizeInBytes();
+OffsetPosition startOffsetPosition = new 
OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+
+OffsetIndex offsetIndex = 
indexCache.getIndexEntry(segmentMetadata).offsetIndex();
+long upperBoundOffset = 
offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+.map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
+
+final List abortedTransactions = 
new ArrayList<>();
+
+Consumer> accumulator =
+abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
+

[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

2023-04-14 Thread via GitHub


Hangleton commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1166846747


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadata = epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadata.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for leader epoch "
++ epochStr + " and partition " + tp + " which does not 
exist in remote tier.");
+}
+
+int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+InputStream remoteSegInputStream = null;
+try {
+// Search forward for the position of the last offset that is 
greater than or equal to the target offset
+remoteSegInputStream = 
remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+RemoteLogInputStream remoteLogInputStream = new 
RemoteLogInputStream(remoteSegInputStream);
+
+RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, 
offset);
+
+if (firstBatch == null)
+return new FetchDataInfo(new LogOffsetMetadata(offset), 
MemoryRecords.EMPTY, false,
+includeAbortedTxns ? 
Optional.of(Collections.emptyList()) : Optional.empty());
+
+int updatedFetchSize =

Review Comment:
   Is there a risk here to overflow the `buffer` if it is allocated up to 
`maxBytes` and the first batch is greater than that?
   
   This could happen if the partition read here is not the first one from the 
Fetch request in which case `minOneMessage` is false [*]. In the local 
counterpart, an empty set of records is returned 
([source](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1247-L1250),
 where `firstEntryIncomplete` describes the case where (using the equivalent 
notation used here) `minOneMessage` was false and `firstBatch.sizeInBytes() > 
maxBytes`).
   
   [*] Even if only one remote partition is served per Fetch request, there can 
still be another partition from that request served from its local replica log 
hence a possibility for not at least one message requested to be read here.



##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -600,6 +622,176 @@ public String toString() {
 }
 }
 
+public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) 
throws RemoteStorageException, IOException {
+int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+FetchRequest.PartitionData fetchInfo = 
remoteStorageFetchInfo.fetchInfo;
+
+boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == 
FetchIsolation.TXN_COMMITTED;
+
+long offset = fetchInfo.fetchOffset;
+int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+Optional logOptional = fetchLog.apply(tp);
+OptionalInt epoch = OptionalInt.empty();
+
+if (logOptional.isPresent()) {
+Option leaderEpochCache = 
logOptional.get().leaderEpochCache();
+if (leaderEpochCache.isDefined()) {
+epoch = leaderEpochCache.get().epochForOffset(offset);
+}
+}
+
+Optional rlsMetadata = epoch.isPresent()
+? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+: Optional.empty();
+
+if (!rlsMetadata.isPresent()) {
+String epochStr = (epoch.isPresent()) ? 
Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+throw new OffsetOutOfRangeException("Received request for offset " 
+ offset + " for