This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 6ea2ed3db05 KAFKA-17801: RemoteLogManager may compute inaccurate
upperBoundOffset for aborted txns (#17676) (#1 7733)
6ea2ed3db05 is described below
commit 6ea2ed3db050b89b1a491329a55f6a66a035d53c
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Sun Nov 10 22:54:26 2024 +0530
KAFKA-17801: RemoteLogManager may compute inaccurate upperBoundOffset for
aborted txns (#17676) (#1 7733)
Reviewers: Jun Rao <[email protected]>
---
.../java/kafka/log/remote/RemoteLogManager.java | 39 ++++++++---
.../kafka/log/remote/RemoteLogManagerTest.java | 79 ++++++++++++++++++++--
2 files changed, 103 insertions(+), 15 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 6f4f5daa9c0..577f9b080de 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -1610,25 +1610,26 @@ public class RemoteLogManager implements Closeable {
}
RemoteLogSegmentMetadata remoteLogSegmentMetadata =
rlsMetadataOptional.get();
+ EnrichedRecordBatch enrichedRecordBatch = new
EnrichedRecordBatch(null, 0);
InputStream remoteSegInputStream = null;
try {
int startPos = 0;
- RecordBatch firstBatch = null;
-
// Iteration over multiple RemoteSegmentMetadata is required in
case of log compaction.
// It may be possible the offset is log compacted in the current
RemoteLogSegmentMetadata
// And we need to iterate over the next segment metadata to fetch
messages higher than the given offset.
- while (firstBatch == null && rlsMetadataOptional.isPresent()) {
+ while (enrichedRecordBatch.batch == null &&
rlsMetadataOptional.isPresent()) {
remoteLogSegmentMetadata = rlsMetadataOptional.get();
// Search forward for the position of the last offset that is
greater than or equal to the target offset
startPos = lookupPositionForOffset(remoteLogSegmentMetadata,
offset);
remoteSegInputStream =
remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
RemoteLogInputStream remoteLogInputStream =
getRemoteLogInputStream(remoteSegInputStream);
- firstBatch = findFirstBatch(remoteLogInputStream, offset);
- if (firstBatch == null) {
+ enrichedRecordBatch = findFirstBatch(remoteLogInputStream,
offset);
+ if (enrichedRecordBatch.batch == null) {
+ Utils.closeQuietly(remoteSegInputStream,
"RemoteLogSegmentInputStream");
rlsMetadataOptional =
findNextSegmentMetadata(rlsMetadataOptional.get(),
logOptional.get().leaderEpochCache());
}
}
+ RecordBatch firstBatch = enrichedRecordBatch.batch;
if (firstBatch == null)
return new FetchDataInfo(new LogOffsetMetadata(offset),
MemoryRecords.EMPTY, false,
includeAbortedTxns ?
Optional.of(Collections.emptyList()) : Optional.empty());
@@ -1659,8 +1660,9 @@ public class RemoteLogManager implements Closeable {
}
buffer.flip();
+ startPos = startPos + enrichedRecordBatch.skippedBytes;
FetchDataInfo fetchDataInfo = new FetchDataInfo(
- new LogOffsetMetadata(offset,
remoteLogSegmentMetadata.startOffset(), startPos),
+ new LogOffsetMetadata(firstBatch.baseOffset(),
remoteLogSegmentMetadata.startOffset(), startPos),
MemoryRecords.readableRecords(buffer));
if (includeAbortedTxns) {
fetchDataInfo =
addAbortedTransactions(firstBatch.baseOffset(), remoteLogSegmentMetadata,
fetchDataInfo, logOptional.get());
@@ -1668,7 +1670,9 @@ public class RemoteLogManager implements Closeable {
return fetchDataInfo;
} finally {
- Utils.closeQuietly(remoteSegInputStream,
"RemoteLogSegmentInputStream");
+ if (enrichedRecordBatch.batch != null) {
+ Utils.closeQuietly(remoteSegInputStream,
"RemoteLogSegmentInputStream");
+ }
}
}
// for testing
@@ -1763,15 +1767,18 @@ public class RemoteLogManager implements Closeable {
}
// Visible for testing
- RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long
offset) throws IOException {
- RecordBatch nextBatch;
+ EnrichedRecordBatch findFirstBatch(RemoteLogInputStream
remoteLogInputStream, long offset) throws IOException {
+ int skippedBytes = 0;
+ RecordBatch nextBatch = null;
// Look for the batch which has the desired offset
// We will always have a batch in that segment as it is a
non-compacted topic.
do {
+ if (nextBatch != null) {
+ skippedBytes += nextBatch.sizeInBytes();
+ }
nextBatch = remoteLogInputStream.nextBatch();
} while (nextBatch != null && nextBatch.lastOffset() < offset);
-
- return nextBatch;
+ return new EnrichedRecordBatch(nextBatch, skippedBytes);
}
OffsetAndEpoch findHighestRemoteOffset(TopicIdPartition topicIdPartition,
UnifiedLog log) throws RemoteStorageException {
@@ -2122,4 +2129,14 @@ public class RemoteLogManager implements Closeable {
'}';
}
}
+
+ static class EnrichedRecordBatch {
+ private final RecordBatch batch;
+ private final int skippedBytes;
+
+ public EnrichedRecordBatch(RecordBatch batch, int skippedBytes) {
+ this.batch = batch;
+ this.skippedBytes = skippedBytes;
+ }
+ }
}
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 81d7e3f100f..24c89ce3970 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -135,6 +135,7 @@ import scala.Option;
import scala.collection.JavaConverters;
import static
kafka.log.remote.RemoteLogManager.isRemoteSegmentWithinLeaderEpochs;
+import static org.apache.kafka.common.record.TimestampType.CREATE_TIME;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
import static
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
@@ -150,6 +151,7 @@ import static
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.
import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT_METRIC;
import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_LOG_READER_FETCH_RATE_AND_TIME_METRIC;
import static
org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.REMOTE_STORAGE_THREAD_POOL_METRICS;
+import static org.apache.kafka.test.TestUtils.tempFile;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -2932,8 +2934,8 @@ public class RemoteLogManagerTest {
}
// This is the key scenario that we are testing here
- RecordBatch findFirstBatch(RemoteLogInputStream
remoteLogInputStream, long offset) {
- return null;
+ EnrichedRecordBatch findFirstBatch(RemoteLogInputStream
remoteLogInputStream, long offset) {
+ return new EnrichedRecordBatch(null, 0);
}
}) {
FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo);
@@ -2999,10 +3001,10 @@ public class RemoteLogManagerTest {
return 1;
}
- RecordBatch findFirstBatch(RemoteLogInputStream
remoteLogInputStream, long offset) {
+ EnrichedRecordBatch findFirstBatch(RemoteLogInputStream
remoteLogInputStream, long offset) {
when(firstBatch.sizeInBytes()).thenReturn(recordBatchSizeInBytes);
doNothing().when(firstBatch).writeTo(capture.capture());
- return firstBatch;
+ return new EnrichedRecordBatch(firstBatch, 0);
}
}) {
FetchDataInfo fetchDataInfo = remoteLogManager.read(fetchInfo);
@@ -3396,6 +3398,75 @@ public class RemoteLogManagerTest {
assertEquals(0,
brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteCopyLagSegments());
}
+ @Test
+ public void testRemoteReadFetchDataInfo() throws RemoteStorageException,
IOException {
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint,
scheduler);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
when(remoteLogMetadataManager.remoteLogSegmentMetadata(eq(leaderTopicIdPartition),
anyInt(), anyLong()))
+ .thenAnswer(ans -> {
+ long offset = ans.getArgument(2);
+ RemoteLogSegmentId segmentId = new
RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid());
+ RemoteLogSegmentMetadata segmentMetadata =
createRemoteLogSegmentMetadata(segmentId,
+ offset - 10, offset + 99, 1024, totalEpochEntries,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED);
+ return Optional.of(segmentMetadata);
+ });
+
+ File segmentFile = tempFile();
+ appendRecordsToFile(segmentFile, 100, 3);
+ FileInputStream fileInputStream = new FileInputStream(segmentFile);
+
when(remoteStorageManager.fetchLogSegment(any(RemoteLogSegmentMetadata.class),
anyInt()))
+ .thenReturn(fileInputStream);
+
+ RemoteLogManager remoteLogManager = new
RemoteLogManager(config.remoteLogManagerConfig(), brokerId, logDir, clusterId,
time,
+ tp -> Optional.of(mockLog),
+ (topicPartition, offset) -> currentLogStartOffset.set(offset),
+ brokerTopicStats, metrics) {
+ public RemoteStorageManager createRemoteStorageManager() {
+ return remoteStorageManager;
+ }
+ public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+ return remoteLogMetadataManager;
+ }
+ int lookupPositionForOffset(RemoteLogSegmentMetadata
remoteLogSegmentMetadata, long offset) {
+ return 0;
+ }
+ };
+ remoteLogManager.startup();
+ remoteLogManager.onLeadershipChange(
+ Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.emptySet(), topicIds);
+
+ long fetchOffset = 10;
+ FetchRequest.PartitionData partitionData = new
FetchRequest.PartitionData(
+ Uuid.randomUuid(), fetchOffset, 0, 100, Optional.empty());
+ RemoteStorageFetchInfo remoteStorageFetchInfo = new
RemoteStorageFetchInfo(
+ 1048576, true, leaderTopicIdPartition.topicPartition(),
+ partitionData, FetchIsolation.HIGH_WATERMARK, false);
+ FetchDataInfo fetchDataInfo =
remoteLogManager.read(remoteStorageFetchInfo);
+ // firstBatch baseOffset may not be equal to the fetchOffset
+ assertEquals(9, fetchDataInfo.fetchOffsetMetadata.messageOffset);
+ assertEquals(273,
fetchDataInfo.fetchOffsetMetadata.relativePositionInSegment);
+ }
+
+ private void appendRecordsToFile(File file, int nRecords, int
nRecordsPerBatch) throws IOException {
+ byte magic = RecordBatch.CURRENT_MAGIC_VALUE;
+ Compression compression = Compression.NONE;
+ long offset = 0;
+ List<SimpleRecord> records = new ArrayList<>();
+ try (FileRecords fileRecords = FileRecords.open(file)) {
+ for (long counter = 1; counter < nRecords + 1; counter++) {
+ records.add(new SimpleRecord("foo".getBytes()));
+ if (counter % nRecordsPerBatch == 0) {
+ fileRecords.append(MemoryRecords.withRecords(magic,
offset, compression, CREATE_TIME,
+ records.toArray(new SimpleRecord[0])));
+ offset += records.size();
+ records.clear();
+ }
+ }
+ fileRecords.flush();
+ }
+ }
+
private Partition mockPartition(TopicIdPartition topicIdPartition) {
TopicPartition tp = topicIdPartition.topicPartition();
Partition partition = mock(Partition.class);