This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b9976437e11 KAFKA-16780: Txn consumer exerts pressure on remote
storage when collecting aborted txns (#17659)
b9976437e11 is described below
commit b9976437e11dee877f32376d18140641ef4ccbc7
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Fri Nov 8 14:49:09 2024 +0530
KAFKA-16780: Txn consumer exerts pressure on remote storage when collecting
aborted txns (#17659)
- KIP-1058
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058:+Txn+consumer+exerts+pressure+on+remote+storage+when+collecting+aborted+transactions)
- Unit and Integration tests added.
Reviewers: Divij Vaidya <[email protected]>
---
.../java/kafka/log/remote/RemoteLogManager.java | 116 ++++++++++++++---
.../kafka/log/remote/RemoteLogManagerTest.java | 86 ++++++++++++
.../remote/storage/RemoteLogMetadataManager.java | 21 +++
.../remote/storage/RemoteLogSegmentMetadata.java | 83 +++++++++++-
.../storage/NoOpRemoteLogMetadataManager.java | 5 +
.../ClassLoaderAwareRemoteLogMetadataManager.java | 5 +
.../metadata/storage/RemoteLogMetadataCache.java | 54 +++++---
.../storage/RemoteLogSegmentMetadataSnapshot.java | 51 +++++++-
.../storage/RemotePartitionMetadataStore.java | 23 ++--
.../TopicBasedRemoteLogMetadataManager.java | 11 ++
.../RemoteLogSegmentMetadataSnapshotTransform.java | 6 +-
.../RemoteLogSegmentMetadataTransform.java | 5 +-
.../storage/internals/log/TransactionIndex.java | 8 ++
.../message/RemoteLogSegmentMetadataRecord.json | 8 ++
.../RemoteLogSegmentMetadataSnapshotRecord.json | 8 ++
.../storage/RemoteLogMetadataFormatterTest.java | 4 +-
...oteLogSegmentMetadataSnapshotTransformTest.java | 13 +-
.../internals/log/TransactionIndexTest.java | 20 +++
.../tiered/storage/TieredStorageTestBuilder.java | 14 +-
.../tiered/storage/TieredStorageTestHarness.java | 4 +
.../tiered/storage/actions/ConsumeAction.java | 75 +++++++++--
.../OffloadAndTxnConsumeFromLeaderTest.java | 106 +++++++++++++++
.../kafka/tiered/storage/specs/FetchableSpec.java | 6 +-
.../tiered/storage/specs/RemoteFetchCount.java | 145 +++++++++++++++++++++
.../tiered/storage/specs/RemoteFetchSpec.java | 20 +--
25 files changed, 805 insertions(+), 92 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 5d2ae4cf2e0..4317f0c5bc3 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -584,13 +584,32 @@ public class RemoteLogManager implements Closeable {
int epochForOffset,
long offset) throws RemoteStorageException {
Uuid topicId = topicIdByPartitionMap.get(topicPartition);
-
if (topicId == null) {
throw new KafkaException("No topic id registered for topic
partition: " + topicPartition);
}
return remoteLogMetadataManager.remoteLogSegmentMetadata(new
TopicIdPartition(topicId, topicPartition), epochForOffset, offset);
}
+ /**
+ * Returns the next segment that may contain the aborted transaction
entries. The search ensures that the returned
+ * segment offsets are greater than or equal to the given offset and in
the same epoch.
+ * @param topicPartition topic partition to search
+ * @param epochForOffset the epoch
+ * @param offset the offset
+ * @return The next segment that contains the transaction index in the
same epoch.
+ * @throws RemoteStorageException If an error occurs while fetching the
remote log segment metadata.
+ */
+ public Optional<RemoteLogSegmentMetadata>
fetchNextSegmentWithTxnIndex(TopicPartition topicPartition,
+ int
epochForOffset,
+
long offset) throws RemoteStorageException {
+ Uuid topicId = topicIdByPartitionMap.get(topicPartition);
+ if (topicId == null) {
+ throw new KafkaException("No topic id registered for topic
partition: " + topicPartition);
+ }
+ TopicIdPartition tpId = new TopicIdPartition(topicId, topicPartition);
+ return remoteLogMetadataManager.nextSegmentWithTxnIndex(tpId,
epochForOffset, offset);
+ }
+
Optional<FileRecords.TimestampAndOffset>
lookupTimestamp(RemoteLogSegmentMetadata rlsMetadata, long timestamp, long
startingOffset)
throws RemoteStorageException, IOException {
int startPos = indexCache.lookupTimestamp(rlsMetadata, timestamp,
startingOffset);
@@ -973,9 +992,10 @@ public class RemoteLogManager implements Closeable {
Map<Integer, Long> segmentLeaderEpochs = new
HashMap<>(epochEntries.size());
epochEntries.forEach(entry -> segmentLeaderEpochs.put(entry.epoch,
entry.startOffset));
+ boolean isTxnIdxEmpty = segment.txnIndex().isEmpty();
RemoteLogSegmentMetadata copySegmentStartedRlsm = new
RemoteLogSegmentMetadata(segmentId, segment.baseOffset(), endOffset,
segment.largestTimestamp(), brokerId, time.milliseconds(),
segment.log().sizeInBytes(),
- segmentLeaderEpochs);
+ segmentLeaderEpochs, isTxnIdxEmpty);
remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get();
@@ -1036,7 +1056,8 @@ public class RemoteLogManager implements Closeable {
// Update the highest offset in remote storage for this
partition's log so that the local log segments
// are not deleted before they are copied to remote storage.
log.updateHighestOffsetInRemoteStorage(endOffset);
- logger.info("Copied {} to remote storage with segment-id: {}",
logFileName, copySegmentFinishedRlsm.remoteLogSegmentId());
+ logger.info("Copied {} to remote storage with segment-id: {}",
+ logFileName, copySegmentFinishedRlsm.remoteLogSegmentId());
long bytesLag = log.onlyLocalLogSegmentsSize() -
log.activeSegment().size();
long segmentsLag = log.onlyLocalLogSegmentsCount() - 1;
@@ -1740,7 +1761,10 @@ public class RemoteLogManager implements Closeable {
abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
+ long startTimeNs = time.nanoseconds();
collectAbortedTransactions(startOffset, upperBoundOffset,
segmentMetadata, accumulator, log);
+ LOGGER.debug("Time taken to collect: {} aborted transactions for {} in
{} ns", abortedTransactions.size(),
+ segmentMetadata, time.nanoseconds() - startTimeNs);
return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
fetchInfo.records,
@@ -1748,29 +1772,51 @@ public class RemoteLogManager implements Closeable {
Optional.of(abortedTransactions.isEmpty() ?
Collections.emptyList() : new ArrayList<>(abortedTransactions)));
}
+ /**
+ * Collects the aborted transaction entries from the current and
subsequent segments until the upper bound offset.
+ * Note that the accumulated aborted transaction entries might contain
duplicates as it collects the entries across
+ * segments. We are relying on the client to discard the duplicates.
+ * @param startOffset The start offset of the fetch request.
+ * @param upperBoundOffset The upper bound offset of the fetch request.
+ * @param segmentMetadata The current segment metadata.
+ * @param accumulator The accumulator to collect the aborted transactions.
+ * @param log The unified log instance.
+ * @throws RemoteStorageException If an error occurs while fetching the
remote log segment metadata.
+ */
private void collectAbortedTransactions(long startOffset,
long upperBoundOffset,
RemoteLogSegmentMetadata
segmentMetadata,
Consumer<List<AbortedTxn>>
accumulator,
UnifiedLog log) throws
RemoteStorageException {
- // Search in remote segments first.
- Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt =
Optional.of(segmentMetadata);
- while (nextSegmentMetadataOpt.isPresent()) {
- Optional<TransactionIndex> txnIndexOpt =
nextSegmentMetadataOpt.map(metadata ->
indexCache.getIndexEntry(metadata).txnIndex());
+ TopicPartition tp =
segmentMetadata.topicIdPartition().topicPartition();
+ boolean isSearchComplete = false;
+ LeaderEpochFileCache leaderEpochCache =
log.leaderEpochCache().getOrElse(null);
+ Optional<RemoteLogSegmentMetadata> currentMetadataOpt =
Optional.of(segmentMetadata);
+ while (!isSearchComplete && currentMetadataOpt.isPresent()) {
+ RemoteLogSegmentMetadata currentMetadata =
currentMetadataOpt.get();
+ Optional<TransactionIndex> txnIndexOpt =
getTransactionIndex(currentMetadata);
if (txnIndexOpt.isPresent()) {
- TxnIndexSearchResult searchResult =
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
+ TransactionIndex txnIndex = txnIndexOpt.get();
+ TxnIndexSearchResult searchResult =
txnIndex.collectAbortedTxns(startOffset, upperBoundOffset);
accumulator.accept(searchResult.abortedTransactions);
- if (searchResult.isComplete) {
- // Return immediately when the search result is complete,
it does not need to go through local log segments.
- return;
- }
+ isSearchComplete = searchResult.isComplete;
+ }
+ if (!isSearchComplete) {
+ currentMetadataOpt = findNextSegmentWithTxnIndex(tp,
currentMetadata.endOffset() + 1, leaderEpochCache);
}
-
- nextSegmentMetadataOpt =
findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log.leaderEpochCache());
}
-
// Search in local segments
- collectAbortedTransactionInLocalSegments(startOffset,
upperBoundOffset, accumulator, log.logSegments().iterator());
+ if (!isSearchComplete) {
+ collectAbortedTransactionInLocalSegments(startOffset,
upperBoundOffset, accumulator, log.logSegments().iterator());
+ }
+ }
+
+ private Optional<TransactionIndex>
getTransactionIndex(RemoteLogSegmentMetadata currentMetadata) {
+ return !currentMetadata.isTxnIdxEmpty() ?
+ // `ofNullable` is needed for backward compatibility for old
events that were stored in the
+ // `__remote_log_metadata` topic. The old events will return
the `txnIdxEmpty` as false, but the
+ // transaction index may not exist in the remote storage.
+
Optional.ofNullable(indexCache.getIndexEntry(currentMetadata).txnIndex()) :
Optional.empty();
}
private void collectAbortedTransactionInLocalSegments(long startOffset,
@@ -1803,6 +1849,44 @@ public class RemoteLogManager implements Closeable {
: Optional.empty();
}
+ /**
+ * Returns the next segment metadata that contains the aborted transaction
entries from the given offset.
+ * Note that the search starts from the given (offset-for-epoch, offset)
pair, when there are no segments contains
+ * the transaction index in that epoch, then it proceeds to the next epoch
(next-epoch, epoch-start-offset)
+ * and the search ends when the segment metadata is found or the leader
epoch cache is exhausted.
+ * Note that the returned segment metadata may or may not contain the
transaction index.
+ * Visible for testing
+ * @param tp The topic partition.
+ * @param offset The offset to start the search.
+ * @param leaderEpochCache The leader epoch file cache, this could be null.
+ * @return The next segment metadata that contains the transaction index.
The transaction index may or may not exist
+ * in that segment metadata which depends on the RLMM plugin
implementation. The caller of this method should handle
+ * for both the cases.
+ * @throws RemoteStorageException If an error occurs while fetching the
remote log segment metadata.
+ */
+ Optional<RemoteLogSegmentMetadata>
findNextSegmentWithTxnIndex(TopicPartition tp,
+ long offset,
+
LeaderEpochFileCache leaderEpochCache) throws RemoteStorageException {
+ if (leaderEpochCache == null) {
+ return Optional.empty();
+ }
+ OptionalInt initialEpochOpt = leaderEpochCache.epochForOffset(offset);
+ if (initialEpochOpt.isEmpty()) {
+ return Optional.empty();
+ }
+ int initialEpoch = initialEpochOpt.getAsInt();
+ for (EpochEntry epochEntry : leaderEpochCache.epochEntries()) {
+ if (epochEntry.epoch >= initialEpoch) {
+ long startOffset = Math.max(epochEntry.startOffset, offset);
+ Optional<RemoteLogSegmentMetadata> metadataOpt =
fetchNextSegmentWithTxnIndex(tp, epochEntry.epoch, startOffset);
+ if (metadataOpt.isPresent()) {
+ return metadataOpt;
+ }
+ }
+ }
+ return Optional.empty();
+ }
+
// Visible for testing
RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long
offset) throws IOException {
RecordBatch nextBatch;
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 4ea373327d9..e75a6ca85d4 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -1414,6 +1414,92 @@ public class RemoteLogManagerTest {
.remoteLogSegmentMetadata(eq(followerTopicIdPartition), anyInt(),
anyLong());
}
+ @Test
+ public void testFetchNextSegmentWithTxnIndex() throws
RemoteStorageException {
+ remoteLogManager.startup();
+ remoteLogManager.onLeadershipChange(
+ Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
+
remoteLogManager.fetchNextSegmentWithTxnIndex(leaderTopicIdPartition.topicPartition(),
10, 100L);
+
remoteLogManager.fetchNextSegmentWithTxnIndex(followerTopicIdPartition.topicPartition(),
20, 200L);
+
+ verify(remoteLogMetadataManager)
+ .nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), anyInt(),
anyLong());
+ verify(remoteLogMetadataManager)
+ .nextSegmentWithTxnIndex(eq(followerTopicIdPartition),
anyInt(), anyLong());
+ }
+
+ @Test
+ public void testFindNextSegmentWithTxnIndex() throws
RemoteStorageException {
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint,
scheduler);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt()))
+ .thenReturn(Optional.of(0L));
+
when(remoteLogMetadataManager.nextSegmentWithTxnIndex(any(TopicIdPartition.class),
anyInt(), anyLong()))
+ .thenAnswer(ans -> {
+ TopicIdPartition topicIdPartition = ans.getArgument(0);
+ int leaderEpoch = ans.getArgument(1);
+ long offset = ans.getArgument(2);
+ RemoteLogSegmentId segmentId = new
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
+ Map<Integer, Long> leaderEpochs = new TreeMap<>();
+ leaderEpochs.put(leaderEpoch, offset);
+ RemoteLogSegmentMetadata metadata = new
RemoteLogSegmentMetadata(segmentId,
+ offset, offset + 100, time.milliseconds(), 0,
time.milliseconds(), 1024, leaderEpochs, true);
+ return Optional.of(metadata);
+ });
+
+ remoteLogManager.startup();
+ remoteLogManager.onLeadershipChange(
+ Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
+
+ // For offset-10, epoch is 0.
+
remoteLogManager.findNextSegmentWithTxnIndex(leaderTopicIdPartition.topicPartition(),
10, cache);
+ verify(remoteLogMetadataManager)
+ .nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), eq(0),
eq(10L));
+ }
+
+ @Test
+ public void testFindNextSegmentWithTxnIndexTraversesNextEpoch() throws
RemoteStorageException {
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint,
scheduler);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
anyInt()))
+ .thenReturn(Optional.of(0L));
+
when(remoteLogMetadataManager.nextSegmentWithTxnIndex(any(TopicIdPartition.class),
anyInt(), anyLong()))
+ .thenAnswer(ans -> {
+ TopicIdPartition topicIdPartition = ans.getArgument(0);
+ int leaderEpoch = ans.getArgument(1);
+ long offset = ans.getArgument(2);
+ Optional<RemoteLogSegmentMetadata> metadataOpt =
Optional.empty();
+ if (leaderEpoch == 2) {
+ RemoteLogSegmentId segmentId = new
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
+ Map<Integer, Long> leaderEpochs = new TreeMap<>();
+ leaderEpochs.put(leaderEpoch, offset);
+ RemoteLogSegmentMetadata metadata = new
RemoteLogSegmentMetadata(segmentId,
+ offset, offset + 100, time.milliseconds(), 0,
time.milliseconds(), 1024, leaderEpochs, true);
+ metadataOpt = Optional.of(metadata);
+ }
+ return metadataOpt;
+ });
+
+ remoteLogManager.startup();
+ remoteLogManager.onLeadershipChange(
+ Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
+
+ // For offset-10, epoch is 0.
+ // 1. For epoch 0 and 1, it returns empty and
+ // 2. For epoch 2, it returns the segment metadata.
+
remoteLogManager.findNextSegmentWithTxnIndex(leaderTopicIdPartition.topicPartition(),
10, cache);
+ verify(remoteLogMetadataManager)
+ .nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), eq(0),
eq(10L));
+ verify(remoteLogMetadataManager)
+ .nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), eq(1),
eq(100L));
+ verify(remoteLogMetadataManager)
+ .nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), eq(2),
eq(200L));
+ }
+
@Test
void testOnLeadershipChangeWillInvokeHandleLeaderOrFollowerPartitions() {
remoteLogManager.startup();
diff --git
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
index 78d48fbd949..2280aa51132 100644
---
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
+++
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
@@ -209,4 +209,25 @@ public interface RemoteLogMetadataManager extends
Configurable, Closeable {
* @return Total size of the log stored in remote storage in bytes.
*/
long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch)
throws RemoteStorageException;
+
+ /**
+ * Returns the next segment metadata that contains the aborted transaction
entries for the given topic partition, epoch and offset.
+ * <ul>
+ * <li>The default implementation returns the segment metadata that
matches the given epoch and offset
+ * irrespective of the presence of the transaction index.</li>
+ * <li>The custom implementation can optimize by returning the next
segment metadata that contains the txn index
+ * in the given epoch. If there are no segments with txn index in the
given epoch, then return empty.</li>
+ * </ul>
+ * @param topicIdPartition topic partition to search for.
+ * @param epoch leader epoch for the given offset.
+ * @param offset offset
+ * @return The next segment metadata. The transaction index may or may not
exist in the returned segment metadata
+ * which depends on the RLMM plugin implementation. The caller of this
method handles for both the cases.
+ * @throws RemoteStorageException if there are any storage related errors
occurred.
+ */
+ default Optional<RemoteLogSegmentMetadata>
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition,
+ int
epoch,
+ long
offset) throws RemoteStorageException {
+ return remoteLogSegmentMetadata(topicIdPartition, epoch, offset);
+ }
}
\ No newline at end of file
diff --git
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
index 9b589322bbf..02918d90625 100644
---
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
+++
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
@@ -78,6 +78,11 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
*/
private final RemoteLogSegmentState state;
+ /**
+ * Indicates whether the transaction index is empty for this segment.
+ */
+ private final boolean txnIdxEmpty;
+
/**
* Creates an instance with the given metadata of remote log segment.
* <p>
@@ -105,6 +110,39 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
Optional<CustomMetadata> customMetadata,
RemoteLogSegmentState state,
Map<Integer, Long> segmentLeaderEpochs) {
+ this(remoteLogSegmentId, startOffset, endOffset, maxTimestampMs,
brokerId, eventTimestampMs, segmentSizeInBytes,
+ customMetadata, state, segmentLeaderEpochs, false);
+ }
+
+ /**
+ * Creates an instance with the given metadata of remote log segment.
+ * <p>
+ * {@code segmentLeaderEpochs} can not be empty. If all the records in
this segment belong to the same leader epoch
+ * then it should have an entry with epoch mapping to start-offset of this
segment.
+ *
+ * @param remoteLogSegmentId Universally unique remote log segment id.
+ * @param startOffset Start offset of this segment (inclusive).
+ * @param endOffset End offset of this segment (inclusive).
+ * @param maxTimestampMs Maximum timestamp in milli seconds in this
segment.
+ * @param brokerId Broker id from which this event is generated.
+ * @param eventTimestampMs Epoch time in milli seconds at which the
remote log segment is copied to the remote tier storage.
+ * @param segmentSizeInBytes Size of this segment in bytes.
+ * @param customMetadata Custom metadata.
+ * @param state State of the respective segment of
remoteLogSegmentId.
+ * @param segmentLeaderEpochs leader epochs occurred within this segment.
+ * @param txnIdxEmpty True if the transaction index is empty,
false otherwise.
+ */
+ public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
+ long startOffset,
+ long endOffset,
+ long maxTimestampMs,
+ int brokerId,
+ long eventTimestampMs,
+ int segmentSizeInBytes,
+ Optional<CustomMetadata> customMetadata,
+ RemoteLogSegmentState state,
+ Map<Integer, Long> segmentLeaderEpochs,
+ boolean txnIdxEmpty) {
super(brokerId, eventTimestampMs);
this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId,
"remoteLogSegmentId can not be null");
this.state = Objects.requireNonNull(state, "state can not be null");
@@ -128,6 +166,7 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
}
this.segmentLeaderEpochs = Collections.unmodifiableNavigableMap(new
TreeMap<>(segmentLeaderEpochs));
+ this.txnIdxEmpty = txnIdxEmpty;
}
/**
@@ -164,6 +203,34 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
segmentLeaderEpochs);
}
+ /**
+ * Creates an instance with the given metadata of remote log segment and
its state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}.
+ * <p>
+ * {@code segmentLeaderEpochs} can not be empty. If all the records in
this segment belong to the same leader epoch
+ * then it should have an entry with epoch mapping to start-offset of this
segment.
+ *
+ * @param remoteLogSegmentId Universally unique remote log segment id.
+ * @param startOffset Start offset of this segment (inclusive).
+ * @param endOffset End offset of this segment (inclusive).
+ * @param maxTimestampMs Maximum timestamp in this segment
+ * @param brokerId Broker id from which this event is generated.
+ * @param eventTimestampMs Epoch time in milli seconds at which the
remote log segment is copied to the remote tier storage.
+ * @param segmentSizeInBytes Size of this segment in bytes.
+ * @param segmentLeaderEpochs leader epochs occurred within this segment
+ * @param txnIdxEmpty True if the transaction index is empty,
false otherwise.
+ */
+ public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
+ long startOffset,
+ long endOffset,
+ long maxTimestampMs,
+ int brokerId,
+ long eventTimestampMs,
+ int segmentSizeInBytes,
+ Map<Integer, Long> segmentLeaderEpochs,
+ boolean txnIdxEmpty) {
+ this(remoteLogSegmentId, startOffset, endOffset, maxTimestampMs,
brokerId, eventTimestampMs, segmentSizeInBytes,
+ Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_STARTED,
segmentLeaderEpochs, txnIdxEmpty);
+ }
/**
* @return unique id of this segment.
@@ -227,6 +294,14 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
return state;
}
+ /**
+ * If true indicates that the transaction index is empty.
+ * @return True if the Transaction index is empty, false otherwise.
+ */
+ public boolean isTxnIdxEmpty() {
+ return txnIdxEmpty;
+ }
+
/**
* Creates a new RemoteLogSegmentMetadata applying the given {@code
rlsmUpdate} on this instance. This method will
* not update this instance.
@@ -241,7 +316,7 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
return new RemoteLogSegmentMetadata(remoteLogSegmentId, startOffset,
endOffset, maxTimestampMs, rlsmUpdate.brokerId(),
rlsmUpdate.eventTimestampMs(),
- segmentSizeInBytes, rlsmUpdate.customMetadata(),
rlsmUpdate.state(), segmentLeaderEpochs);
+ segmentSizeInBytes, rlsmUpdate.customMetadata(),
rlsmUpdate.state(), segmentLeaderEpochs, txnIdxEmpty);
}
@Override
@@ -266,13 +341,14 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
&& Objects.equals(customMetadata, that.customMetadata)
&& state == that.state
&& eventTimestampMs() == that.eventTimestampMs()
- && brokerId() == that.brokerId();
+ && brokerId() == that.brokerId()
+ && txnIdxEmpty == that.txnIdxEmpty;
}
@Override
public int hashCode() {
return Objects.hash(remoteLogSegmentId, startOffset, endOffset,
brokerId(), maxTimestampMs,
- eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes,
customMetadata, state);
+ eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes,
customMetadata, state, txnIdxEmpty);
}
@Override
@@ -288,6 +364,7 @@ public class RemoteLogSegmentMetadata extends
RemoteLogMetadata {
", segmentSizeInBytes=" + segmentSizeInBytes +
", customMetadata=" + customMetadata +
", state=" + state +
+ ", txnIdxEmpty=" + txnIdxEmpty +
'}';
}
diff --git
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
index 6ffcf85b9b0..cb9491360f8 100644
---
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
+++
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
@@ -79,6 +79,11 @@ public class NoOpRemoteLogMetadataManager implements
RemoteLogMetadataManager {
return 0;
}
+ @Override
+ public Optional<RemoteLogSegmentMetadata>
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long
offset) {
+ return Optional.empty();
+ }
+
@Override
public void close() throws IOException {
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
index dc2678242f4..1abcbbc20ce 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
@@ -106,6 +106,11 @@ public class ClassLoaderAwareRemoteLogMetadataManager
implements RemoteLogMetada
return withClassLoader(() -> delegate.remoteLogSize(topicIdPartition,
leaderEpoch));
}
+ @Override
+ public Optional<RemoteLogSegmentMetadata>
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long
offset) throws RemoteStorageException {
+ return withClassLoader(() ->
delegate.nextSegmentWithTxnIndex(topicIdPartition, epoch, offset));
+ }
+
@Override
public void configure(Map<String, ?> configs) {
withClassLoader(() -> {
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
index 2ee1c29f6e5..b08458b84b8 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
@@ -125,29 +125,45 @@ public class RemoteLogMetadataCache {
* @return the requested remote log segment metadata if it exists.
*/
public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int
leaderEpoch, long offset) {
- RemoteLogLeaderEpochState remoteLogLeaderEpochState =
leaderEpochEntries.get(leaderEpoch);
-
- if (remoteLogLeaderEpochState == null) {
- return Optional.empty();
+ RemoteLogSegmentMetadata metadata = getSegmentMetadata(leaderEpoch,
offset);
+ long epochEndOffset = -1L;
+ if (metadata != null) {
+ // Check whether the given offset with leaderEpoch exists in this
segment.
+ // Check for epoch's offset boundaries with in this segment.
+ // 1. Get the next epoch's start offset -1 if exists
+ // 2. If no next epoch exists, then segment end offset can be
considered as epoch's relative end offset.
+ Map.Entry<Integer, Long> nextEntry =
metadata.segmentLeaderEpochs().higherEntry(leaderEpoch);
+ epochEndOffset = (nextEntry != null) ? nextEntry.getValue() - 1 :
metadata.endOffset();
}
+ // Return empty when target offset > epoch's end offset.
+ return offset > epochEndOffset ? Optional.empty() :
Optional.ofNullable(metadata);
+ }
- // Look for floor entry as the given offset may exist in this entry.
- RemoteLogSegmentId remoteLogSegmentId =
remoteLogLeaderEpochState.floorEntry(offset);
- if (remoteLogSegmentId == null) {
- // If the offset is lower than the minimum offset available in
metadata then return empty.
- return Optional.empty();
+ public Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(int
leaderEpoch, long offset) {
+ boolean txnIdxEmpty = true;
+ Optional<RemoteLogSegmentMetadata> metadataOpt =
remoteLogSegmentMetadata(leaderEpoch, offset);
+ while (metadataOpt.isPresent() && txnIdxEmpty) {
+ txnIdxEmpty = metadataOpt.get().isTxnIdxEmpty();
+ if (txnIdxEmpty) { // If txn index is empty, then look for next
segment.
+ metadataOpt = remoteLogSegmentMetadata(leaderEpoch,
metadataOpt.get().endOffset() + 1);
+ }
}
+ return txnIdxEmpty ? Optional.empty() : metadataOpt;
+ }
- RemoteLogSegmentMetadata metadata =
idToSegmentMetadata.get(remoteLogSegmentId);
- // Check whether the given offset with leaderEpoch exists in this
segment.
- // Check for epoch's offset boundaries with in this segment.
- // 1. Get the next epoch's start offset -1 if exists
- // 2. If no next epoch exists, then segment end offset can be
considered as epoch's relative end offset.
- Map.Entry<Integer, Long> nextEntry =
metadata.segmentLeaderEpochs().higherEntry(leaderEpoch);
- long epochEndOffset = (nextEntry != null) ? nextEntry.getValue() - 1 :
metadata.endOffset();
-
- // Return empty when target offset > epoch's end offset.
- return offset > epochEndOffset ? Optional.empty() :
Optional.of(metadata);
+ private RemoteLogSegmentMetadata getSegmentMetadata(int leaderEpoch, long
offset) {
+ RemoteLogLeaderEpochState remoteLogLeaderEpochState =
leaderEpochEntries.get(leaderEpoch);
+ if (remoteLogLeaderEpochState != null) {
+ // Look for floor entry as the given offset may exist in this
entry.
+ RemoteLogSegmentId remoteLogSegmentId =
remoteLogLeaderEpochState.floorEntry(offset);
+ if (remoteLogSegmentId != null) {
+ return idToSegmentMetadata.get(remoteLogSegmentId);
+ } else {
+ log.warn("No remote segment found for leaderEpoch: {}, offset:
{}", leaderEpoch, offset);
+ }
+ }
+ // If the offset is lower than the minimum offset available in
metadata then return null.
+ return null;
}
public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate
metadataUpdate)
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
index a1ba23dd8f8..64540a7fabd 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
@@ -78,6 +78,11 @@ public class RemoteLogSegmentMetadataSnapshot extends
RemoteLogMetadata {
*/
private final RemoteLogSegmentState state;
+ /**
+ * Indicates whether the transaction index is empty for this segment.
+ */
+ private final boolean txnIdxEmpty;
+
/**
* Creates an instance with the given metadata of remote log segment.
* <p>
@@ -105,6 +110,39 @@ public class RemoteLogSegmentMetadataSnapshot extends
RemoteLogMetadata {
Optional<CustomMetadata>
customMetadata,
RemoteLogSegmentState state,
Map<Integer, Long>
segmentLeaderEpochs) {
+ this(segmentId, startOffset, endOffset, maxTimestampMs, brokerId,
eventTimestampMs, segmentSizeInBytes,
+ customMetadata, state, segmentLeaderEpochs, false);
+ }
+
+ /**
+ * Creates an instance with the given metadata of remote log segment.
+ * <p>
+ * {@code segmentLeaderEpochs} can not be empty. If all the records in
this segment belong to the same leader epoch
+ * then it should have an entry with epoch mapping to start-offset of this
segment.
+ *
+ * @param segmentId Universally unique remote log segment id.
+ * @param startOffset Start offset of this segment (inclusive).
+ * @param endOffset End offset of this segment (inclusive).
+ * @param maxTimestampMs Maximum timestamp in milliseconds in this
segment.
+ * @param brokerId Broker id from which this event is generated.
+ * @param eventTimestampMs Epoch time in milliseconds at which the
remote log segment is copied to the remote tier storage.
+ * @param segmentSizeInBytes Size of this segment in bytes.
+ * @param customMetadata Custom metadata.
+ * @param state State of the respective segment of
remoteLogSegmentId.
+ * @param segmentLeaderEpochs leader epochs occurred within this segment.
+ * @param txnIdxEmpty true if the transaction index is empty,
false otherwise.
+ */
+ public RemoteLogSegmentMetadataSnapshot(Uuid segmentId,
+ long startOffset,
+ long endOffset,
+ long maxTimestampMs,
+ int brokerId,
+ long eventTimestampMs,
+ int segmentSizeInBytes,
+ Optional<CustomMetadata>
customMetadata,
+ RemoteLogSegmentState state,
+ Map<Integer, Long>
segmentLeaderEpochs,
+ boolean txnIdxEmpty) {
super(brokerId, eventTimestampMs);
this.segmentId = Objects.requireNonNull(segmentId, "remoteLogSegmentId
can not be null");
this.state = Objects.requireNonNull(state, "state can not be null");
@@ -114,6 +152,7 @@ public class RemoteLogSegmentMetadataSnapshot extends
RemoteLogMetadata {
this.maxTimestampMs = maxTimestampMs;
this.segmentSizeInBytes = segmentSizeInBytes;
this.customMetadata = Objects.requireNonNull(customMetadata,
"customMetadata can not be null");
+ this.txnIdxEmpty = txnIdxEmpty;
if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) {
throw new IllegalArgumentException("segmentLeaderEpochs can not be
null or empty");
@@ -125,7 +164,7 @@ public class RemoteLogSegmentMetadataSnapshot extends
RemoteLogMetadata {
public static RemoteLogSegmentMetadataSnapshot
create(RemoteLogSegmentMetadata metadata) {
return new
RemoteLogSegmentMetadataSnapshot(metadata.remoteLogSegmentId().id(),
metadata.startOffset(), metadata.endOffset(),
metadata.maxTimestampMs(),
metadata.brokerId(), metadata.eventTimestampMs(),
-
metadata.segmentSizeInBytes(), metadata.customMetadata(), metadata.state(),
metadata.segmentLeaderEpochs()
+
metadata.segmentSizeInBytes(), metadata.customMetadata(), metadata.state(),
metadata.segmentLeaderEpochs(), metadata.isTxnIdxEmpty()
);
}
@@ -191,6 +230,10 @@ public class RemoteLogSegmentMetadataSnapshot extends
RemoteLogMetadata {
return state;
}
+ public boolean isTxnIdxEmpty() {
+ return txnIdxEmpty;
+ }
+
@Override
public TopicIdPartition topicIdPartition() {
throw new UnsupportedOperationException("This metadata does not have
topic partition with it.");
@@ -208,12 +251,13 @@ public class RemoteLogSegmentMetadataSnapshot extends
RemoteLogMetadata {
&& Objects.equals(customMetadata, that.customMetadata)
&& Objects.equals(segmentId, that.segmentId)
&& Objects.equals(segmentLeaderEpochs,
that.segmentLeaderEpochs)
- && state == that.state;
+ && state == that.state
+ && txnIdxEmpty == that.txnIdxEmpty;
}
@Override
public int hashCode() {
- return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs,
segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state);
+ return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs,
segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state, txnIdxEmpty);
}
@Override
@@ -227,6 +271,7 @@ public class RemoteLogSegmentMetadataSnapshot extends
RemoteLogMetadata {
", segmentSizeInBytes=" + segmentSizeInBytes +
", customMetadata=" + customMetadata +
", state=" + state +
+ ", txnIdxEmpty=" + txnIdxEmpty +
'}';
}
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
index 86c29567d36..c74e97fbe68 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
@@ -55,7 +55,7 @@ public class RemotePartitionMetadataStore extends
RemotePartitionMetadataEventHa
@Override
public void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata
remoteLogSegmentMetadata) {
- log.debug("Adding remote log segment : [{}]",
remoteLogSegmentMetadata);
+ log.debug("Adding remote log segment: {}", remoteLogSegmentMetadata);
final RemoteLogSegmentId remoteLogSegmentId =
remoteLogSegmentMetadata.remoteLogSegmentId();
TopicIdPartition topicIdPartition =
remoteLogSegmentId.topicIdPartition();
@@ -71,7 +71,7 @@ public class RemotePartitionMetadataStore extends
RemotePartitionMetadataEventHa
@Override
public void
handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate rlsmUpdate)
{
- log.debug("Updating remote log segment: [{}]", rlsmUpdate);
+ log.debug("Updating remote log segment: {}", rlsmUpdate);
RemoteLogSegmentId remoteLogSegmentId =
rlsmUpdate.remoteLogSegmentId();
TopicIdPartition topicIdPartition =
remoteLogSegmentId.topicIdPartition();
RemoteLogMetadataCache remoteLogMetadataCache =
idToRemoteLogMetadataCache.get(topicIdPartition);
@@ -88,7 +88,7 @@ public class RemotePartitionMetadataStore extends
RemotePartitionMetadataEventHa
@Override
public void
handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata
remotePartitionDeleteMetadata) {
- log.debug("Received partition delete state with: [{}]",
remotePartitionDeleteMetadata);
+ log.debug("Received partition delete state with: {}",
remotePartitionDeleteMetadata);
TopicIdPartition topicIdPartition =
remotePartitionDeleteMetadata.topicIdPartition();
idToPartitionDeleteMetadata.put(topicIdPartition,
remotePartitionDeleteMetadata);
@@ -108,30 +108,25 @@ public class RemotePartitionMetadataStore extends
RemotePartitionMetadataEventHa
public Iterator<RemoteLogSegmentMetadata>
listRemoteLogSegments(TopicIdPartition topicIdPartition)
throws RemoteStorageException {
- Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be
null");
-
return
getRemoteLogMetadataCache(topicIdPartition).listAllRemoteLogSegments();
}
public Iterator<RemoteLogSegmentMetadata>
listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch)
throws RemoteStorageException {
- Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be
null");
-
return
getRemoteLogMetadataCache(topicIdPartition).listRemoteLogSegments(leaderEpoch);
}
private RemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition
topicIdPartition)
throws RemoteResourceNotFoundException {
+ Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be
null");
RemoteLogMetadataCache remoteLogMetadataCache =
idToRemoteLogMetadataCache.get(topicIdPartition);
if (remoteLogMetadataCache == null) {
throw new RemoteResourceNotFoundException("No resource found for
partition: " + topicIdPartition);
}
-
if (!remoteLogMetadataCache.isInitialized()) {
// Throwing a retriable ReplicaNotAvailableException here for
clients retry.
throw new ReplicaNotAvailableException("Remote log metadata cache
is not initialized for partition: " + topicIdPartition);
}
-
return remoteLogMetadataCache;
}
@@ -139,15 +134,17 @@ public class RemotePartitionMetadataStore extends
RemotePartitionMetadataEventHa
long
offset,
int
epochForOffset)
throws RemoteStorageException {
- Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be
null");
-
return
getRemoteLogMetadataCache(topicIdPartition).remoteLogSegmentMetadata(epochForOffset,
offset);
}
+ public Optional<RemoteLogSegmentMetadata>
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition,
+ int
epoch,
+ long
offset) throws RemoteStorageException {
+ return
getRemoteLogMetadataCache(topicIdPartition).nextSegmentWithTxnIndex(epoch,
offset);
+ }
+
public Optional<Long> highestLogOffset(TopicIdPartition topicIdPartition,
int leaderEpoch) throws
RemoteStorageException {
- Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be
null");
-
return
getRemoteLogMetadataCache(topicIdPartition).highestOffsetForEpoch(leaderEpoch);
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index eff498b5437..a5db1ea38ef 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -356,6 +356,17 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
return remoteLogSize;
}
+ @Override
+ public Optional<RemoteLogSegmentMetadata>
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long
offset) throws RemoteStorageException {
+ lock.readLock().lock();
+ try {
+ ensureInitializedAndNotClosed();
+ return
remotePartitionMetadataStore.nextSegmentWithTxnIndex(topicIdPartition, epoch,
offset);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
@Override
public void configure(Map<String, ?> configs) {
Objects.requireNonNull(configs, "configs can not be null.");
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
index ad47ee05c84..4e839d08b3a 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
@@ -40,7 +40,8 @@ public class RemoteLogSegmentMetadataSnapshotTransform
implements RemoteLogMetad
.setMaxTimestampMs(segmentMetadata.maxTimestampMs())
.setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata.segmentLeaderEpochs()))
- .setRemoteLogSegmentState(segmentMetadata.state().id());
+ .setRemoteLogSegmentState(segmentMetadata.state().id())
+ .setTxnIndexEmpty(segmentMetadata.isTxnIdxEmpty());
segmentMetadata.customMetadata().ifPresent(md ->
record.setCustomMetadata(md.value()));
return new ApiMessageAndVersion(record,
record.highestSupportedVersion());
@@ -72,7 +73,8 @@ public class RemoteLogSegmentMetadataSnapshotTransform
implements RemoteLogMetad
record.segmentSizeInBytes(),
customMetadata,
RemoteLogSegmentState.forId(record.remoteLogSegmentState()),
- segmentLeaderEpochs);
+ segmentLeaderEpochs,
+ record.txnIndexEmpty());
}
}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
index 9e893d2cbc3..99f3fc0d90c 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
@@ -44,7 +44,8 @@ public class RemoteLogSegmentMetadataTransform implements
RemoteLogMetadataTrans
.setMaxTimestampMs(segmentMetadata.maxTimestampMs())
.setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata))
- .setRemoteLogSegmentState(segmentMetadata.state().id());
+ .setRemoteLogSegmentState(segmentMetadata.state().id())
+ .setTxnIndexEmpty(segmentMetadata.isTxnIdxEmpty());
segmentMetadata.customMetadata().ifPresent(md ->
record.setCustomMetadata(md.value()));
return new ApiMessageAndVersion(record,
record.highestSupportedVersion());
@@ -83,7 +84,7 @@ public class RemoteLogSegmentMetadataTransform implements
RemoteLogMetadataTrans
new RemoteLogSegmentMetadata(remoteLogSegmentId,
record.startOffset(), record.endOffset(),
record.maxTimestampMs(),
record.brokerId(),
record.eventTimestampMs(),
record.segmentSizeInBytes(),
- segmentLeaderEpochs);
+ segmentLeaderEpochs,
record.txnIndexEmpty());
RemoteLogSegmentMetadataUpdate rlsmUpdate
= new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId,
record.eventTimestampMs(),
customMetadata,
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
index 10d1449cdb7..8e089dc3cfc 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
@@ -194,6 +194,14 @@ public class TransactionIndex implements Closeable {
}
}
+ /**
+ * Check if the index is empty.
+ * @return `true` if the index is empty (or) when underlying file doesn't
exists, `false` otherwise.
+ */
+ public boolean isEmpty() {
+ return !iterable().iterator().hasNext();
+ }
+
private FileChannel openChannel() throws IOException {
FileChannel channel = FileChannel.open(file.toPath(),
StandardOpenOption.CREATE,
StandardOpenOption.READ, StandardOpenOption.WRITE);
diff --git
a/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
b/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
index c737135a6a2..9c035f52630 100644
--- a/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
+++ b/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
@@ -129,6 +129,14 @@
"type": "int8",
"versions": "0+",
"about": "State identifier of the remote log segment, which is
RemoteLogSegmentState.id()."
+ },
+ {
+ "name": "TxnIndexEmpty",
+ "type": "bool",
+ "versions": "0+",
+ "about": "Flag to indicate if the transaction index is empty.",
+ "taggedVersions": "0+",
+ "tag": 0
}
]
}
\ No newline at end of file
diff --git
a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
index 20fb1732572..f4a1f19dca4 100644
---
a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
+++
b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
@@ -95,6 +95,14 @@
"type": "int8",
"versions": "0+",
"about": "State of the remote log segment"
+ },
+ {
+ "name": "TxnIndexEmpty",
+ "type": "bool",
+ "versions": "0+",
+ "about": "Flag to indicate if the transaction index is empty.",
+ "taggedVersions": "0+",
+ "tag": 0
}
]
}
\ No newline at end of file
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
index d6a03441e8b..47925d01a7d 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
@@ -53,7 +53,7 @@ public class RemoteLogMetadataFormatterTest {
Optional<CustomMetadata> customMetadata = Optional.of(new
CustomMetadata(new byte[10]));
RemoteLogSegmentMetadata remoteLogMetadata = new
RemoteLogSegmentMetadata(
remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024,
customMetadata, COPY_SEGMENT_STARTED,
- segLeaderEpochs);
+ segLeaderEpochs, true);
byte[] metadataBytes = new
RemoteLogMetadataSerde().serialize(remoteLogMetadata);
ConsumerRecord<byte[], byte[]> metadataRecord = new ConsumerRecord<>(
@@ -65,7 +65,7 @@ public class RemoteLogMetadataFormatterTest {
"startOffset=0, endOffset=100, brokerId=1,
maxTimestampMs=-1, " +
"eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20,
2=80}, segmentSizeInBytes=1024, " +
"customMetadata=Optional[CustomMetadata{10 bytes}], " +
- "state=COPY_SEGMENT_STARTED}\n",
+ "state=COPY_SEGMENT_STARTED, txnIdxEmpty=true}\n",
TOPIC_ID, SEGMENT_ID);
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos)) {
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
index b4c4110ecea..399529129ff 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
@@ -35,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
class RemoteLogSegmentMetadataSnapshotTransformTest {
@ParameterizedTest
@MethodSource("parameters")
- void testToAndFromMessage(Optional<CustomMetadata> customMetadata) {
+ void testToAndFromMessage(Optional<CustomMetadata> customMetadata, boolean
isTxnIdxEmpty) {
Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
segmentLeaderEpochs.put(0, 0L);
RemoteLogSegmentMetadataSnapshot snapshot = new
RemoteLogSegmentMetadataSnapshot(
@@ -43,7 +43,8 @@ class RemoteLogSegmentMetadataSnapshotTransformTest {
0L, 100L, -1L, 0, 0, 1234,
customMetadata,
RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
- segmentLeaderEpochs
+ segmentLeaderEpochs,
+ isTxnIdxEmpty
);
RemoteLogSegmentMetadataSnapshotTransform transform = new
RemoteLogSegmentMetadataSnapshotTransform();
@@ -51,11 +52,11 @@ class RemoteLogSegmentMetadataSnapshotTransformTest {
assertEquals(snapshot, transform.fromApiMessageAndVersion(message));
}
- private static Stream<Object> parameters() {
+ private static Stream<Object[]> parameters() {
return Stream.of(
- Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
- Optional.of(new CustomMetadata(new byte[0])),
- Optional.empty()
+ new Object[]{Optional.of(new CustomMetadata(new byte[]{0, 1,
2, 3})), true},
+ new Object[]{Optional.of(new CustomMetadata(new byte[0])),
false},
+ new Object[]{Optional.empty(), true}
);
}
}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java
index 7a75ab0cced..1d19d433e05 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java
@@ -202,4 +202,24 @@ public class TransactionIndexTest {
index.deleteIfExists();
assertFalse(file.exists());
}
+
+ @Test
+ public void testIsEmptyWhenFileDoesNotExist() throws IOException {
+ File nonExistentFile = TestUtils.tempFile();
+ assertTrue(nonExistentFile.delete());
+ try (TransactionIndex testIndex = new TransactionIndex(0,
nonExistentFile)) {
+ assertTrue(testIndex.isEmpty());
+ }
+ }
+
+ @Test
+ public void testIsEmptyWhenFileIsEmpty() {
+ assertTrue(index.isEmpty());
+ }
+
+ @Test
+ public void testIsEmptyWhenFileIsNotEmpty() throws IOException {
+ index.append(new AbortedTxn(0L, 0, 10, 2));
+ assertFalse(index.isEmpty());
+ }
}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
index a93dea813d1..c51405c8636 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
@@ -53,6 +53,7 @@ import org.apache.kafka.tiered.storage.specs.OffloadableSpec;
import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec;
import org.apache.kafka.tiered.storage.specs.ProducableSpec;
import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
import org.apache.kafka.tiered.storage.specs.TopicSpec;
@@ -228,10 +229,17 @@ public final class TieredStorageTestBuilder {
public TieredStorageTestBuilder expectFetchFromTieredStorage(Integer
fromBroker,
String topic,
Integer
partition,
- Integer
remoteFetchRequestCount) {
+ Integer
segmentFetchRequestCount) {
+ return expectFetchFromTieredStorage(fromBroker, topic, partition, new
RemoteFetchCount(segmentFetchRequestCount));
+ }
+
+ public TieredStorageTestBuilder expectFetchFromTieredStorage(Integer
fromBroker,
+ String topic,
+ Integer
partition,
+
RemoteFetchCount remoteFetchRequestCount) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
assertTrue(partition >= 0, "Partition must be >= 0");
- assertTrue(remoteFetchRequestCount >= 0, "Expected fetch count from
tiered storage must be >= 0");
+
assertTrue(remoteFetchRequestCount.getSegmentFetchCountAndOp().getCount() >= 0,
"Expected fetch count from tiered storage must be >= 0");
assertFalse(fetchables.containsKey(topicPartition), "Consume already
in progress for " + topicPartition);
fetchables.put(topicPartition, new FetchableSpec(fromBroker,
remoteFetchRequestCount));
return this;
@@ -371,7 +379,7 @@ public final class TieredStorageTestBuilder {
private void createConsumeAction() {
if (!consumables.isEmpty()) {
consumables.forEach((topicPartition, consumableSpec) -> {
- FetchableSpec fetchableSpec =
fetchables.computeIfAbsent(topicPartition, k -> new FetchableSpec(0, 0));
+ FetchableSpec fetchableSpec =
fetchables.computeIfAbsent(topicPartition, k -> new FetchableSpec(0, new
RemoteFetchCount(0)));
RemoteFetchSpec remoteFetchSpec = new
RemoteFetchSpec(fetchableSpec.getSourceBrokerId(), topicPartition,
fetchableSpec.getFetchCount());
ConsumeAction action = new ConsumeAction(topicPartition,
consumableSpec.getFetchOffset(),
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
index 97feae05654..4b4c401b1d2 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
@@ -92,12 +92,16 @@ public abstract class TieredStorageTestHarness extends
IntegrationTestHarness {
protected abstract void writeTestSpecifications(TieredStorageTestBuilder
builder);
+ protected void overrideConsumerConfig(Properties consumerConfig) {
+ }
+
@BeforeEach
@Override
public void setUp(TestInfo testInfo) {
testClassName =
testInfo.getTestClass().get().getSimpleName().toLowerCase(Locale.getDefault());
storageDirPath = TestUtils.tempDirectory("kafka-remote-tier-" +
testClassName).getAbsolutePath();
super.setUp(testInfo);
+ overrideConsumerConfig(consumerConfig());
context = new TieredStorageTestContext(this);
}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
index 250db6a46e8..9549a6b6916 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
@@ -25,20 +25,26 @@ import
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
import java.io.PrintStream;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import static
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
import static
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
import static
org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher.correspondTo;
import static
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.tieredStorageRecords;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public final class ConsumeAction implements TieredStorageTestAction {
@@ -75,6 +81,9 @@ public final class ConsumeAction implements
TieredStorageTestAction {
// type has yet to happen.
LocalTieredStorageHistory history =
context.tieredStorageHistory(remoteFetchSpec.getSourceBrokerId());
Optional<LocalTieredStorageEvent> latestEventSoFar =
history.latestEvent(FETCH_SEGMENT, topicPartition);
+ Optional<LocalTieredStorageEvent> latestOffsetIdxEventSoFar =
history.latestEvent(FETCH_OFFSET_INDEX, topicPartition);
+ Optional<LocalTieredStorageEvent> latestTimeIdxEventSoFar =
history.latestEvent(FETCH_TIME_INDEX, topicPartition);
+ Optional<LocalTieredStorageEvent> latestTxnIdxEventSoFar =
history.latestEvent(FETCH_TRANSACTION_INDEX, topicPartition);
// Records are consumed here
List<ConsumerRecord<String, String>> consumedRecords =
@@ -119,16 +128,61 @@ public final class ConsumeAction implements
TieredStorageTestAction {
assertThat(storedRecords, correspondTo(readRecords, topicPartition,
serde, serde));
// (B) Assessment of the interactions between the source broker and
the second-tier storage.
- List<LocalTieredStorageEvent> events =
history.getEvents(FETCH_SEGMENT, topicPartition);
- List<LocalTieredStorageEvent> eventsInScope = latestEventSoFar
- .map(latestEvent ->
- events.stream().filter(event ->
event.isAfter(latestEvent)).collect(Collectors.toList()))
- .orElse(events);
-
- assertEquals(remoteFetchSpec.getCount(), eventsInScope.size(),
- "Number of fetch requests from broker " +
remoteFetchSpec.getSourceBrokerId() + " to the " +
- "tier storage does not match the expected value for
topic-partition "
- + remoteFetchSpec.getTopicPartition());
+ for (LocalTieredStorageEvent.EventType eventType :
Arrays.asList(FETCH_SEGMENT, FETCH_OFFSET_INDEX, FETCH_TIME_INDEX,
FETCH_TRANSACTION_INDEX)) {
+ Optional<LocalTieredStorageEvent> latestEvent;
+ switch (eventType) {
+ case FETCH_SEGMENT:
+ latestEvent = latestEventSoFar;
+ break;
+ case FETCH_OFFSET_INDEX:
+ latestEvent = latestOffsetIdxEventSoFar;
+ break;
+ case FETCH_TIME_INDEX:
+ latestEvent = latestTimeIdxEventSoFar;
+ break;
+ case FETCH_TRANSACTION_INDEX:
+ latestEvent = latestTxnIdxEventSoFar;
+ break;
+ default:
+ latestEvent = Optional.empty();
+ }
+
+ List<LocalTieredStorageEvent> events =
history.getEvents(eventType, topicPartition);
+ List<LocalTieredStorageEvent> eventsInScope = latestEvent
+ .map(e -> events.stream().filter(event ->
event.isAfter(e)).collect(Collectors.toList()))
+ .orElse(events);
+
+ RemoteFetchCount remoteFetchCount =
remoteFetchSpec.getRemoteFetchCount();
+ RemoteFetchCount.FetchCountAndOp expectedCountAndOp;
+ switch (eventType) {
+ case FETCH_SEGMENT:
+ expectedCountAndOp =
remoteFetchCount.getSegmentFetchCountAndOp();
+ break;
+ case FETCH_OFFSET_INDEX:
+ expectedCountAndOp =
remoteFetchCount.getOffsetIdxFetchCountAndOp();
+ break;
+ case FETCH_TIME_INDEX:
+ expectedCountAndOp =
remoteFetchCount.getTimeIdxFetchCountAndOp();
+ break;
+ case FETCH_TRANSACTION_INDEX:
+ expectedCountAndOp =
remoteFetchCount.getTxnIdxFetchCountAndOp();
+ break;
+ default:
+ expectedCountAndOp = new
RemoteFetchCount.FetchCountAndOp(-1, RemoteFetchCount.OperationType.EQUALS_TO);
+ }
+
+ String message = String.format("Number of %s requests from broker
%d to the tier storage does not match the expected value for topic-partition
%s",
+ eventType, remoteFetchSpec.getSourceBrokerId(),
remoteFetchSpec.getTopicPartition());
+ if (expectedCountAndOp.getCount() != -1) {
+ if (expectedCountAndOp.getOperationType() ==
RemoteFetchCount.OperationType.EQUALS_TO) {
+ assertEquals(expectedCountAndOp.getCount(),
eventsInScope.size(), message);
+ } else if (expectedCountAndOp.getOperationType() ==
RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO) {
+ assertTrue(eventsInScope.size() <=
expectedCountAndOp.getCount(), message);
+ } else {
+ assertTrue(eventsInScope.size() >=
expectedCountAndOp.getCount(), message);
+ }
+ }
+ }
}
@Override
@@ -138,5 +192,6 @@ public final class ConsumeAction implements
TieredStorageTestAction {
output.println(" fetch-offset = " + fetchOffset);
output.println(" expected-record-count = " + expectedTotalCount);
output.println(" expected-record-from-tiered-storage = " +
expectedFromSecondTierCount);
+ output.println(" remote-fetch-spec = " + remoteFetchSpec);
}
}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
new file mode 100644
index 00000000000..38b7ae3df2d
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static
org.apache.kafka.tiered.storage.specs.RemoteFetchCount.FetchCountAndOp;
+import static
org.apache.kafka.tiered.storage.specs.RemoteFetchCount.OperationType.EQUALS_TO;
+import static
org.apache.kafka.tiered.storage.specs.RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO;
+
+/**
+ * Test Cases:
+ * Elementary offloads and fetches from tiered storage using consumer with
read_committed isolation level.
+ */
+public final class OffloadAndTxnConsumeFromLeaderTest extends
TieredStorageTestHarness {
+
+ /**
+ * Cluster of one broker
+ * @return number of brokers in the cluster
+ */
+ @Override
+ public int brokerCount() {
+ return 1;
+ }
+
+ @Override
+ public Properties overridingProps() {
+ Properties props = super.overridingProps();
+ // Configure the remote-log index cache size to hold one entry to
simulate eviction of cached index entries.
+
props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
"1");
+ return props;
+ }
+
+ @Override
+ protected void overrideConsumerConfig(Properties consumerConfig) {
+ consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString());
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final Integer broker = 0;
+ final String topicA = "topicA";
+ final Integer p0 = 0;
+ final Integer partitionCount = 1;
+ final Integer replicationFactor = 1;
+ final Integer oneBatchPerSegment = 1;
+ final Map<Integer, List<Integer>> replicaAssignment = null;
+ final boolean enableRemoteLogStorage = true;
+
+ builder
+ .createTopic(topicA, partitionCount, replicationFactor,
oneBatchPerSegment, replicaAssignment,
+ enableRemoteLogStorage)
+ .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new
KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker, topicA, p0, 1, new
KeyValueSpec("k1", "v1"))
+ .expectSegmentToBeOffloaded(broker, topicA, p0, 2, new
KeyValueSpec("k2", "v2"))
+ .expectSegmentToBeOffloaded(broker, topicA, p0, 3, new
KeyValueSpec("k3", "v3"))
+ .expectSegmentToBeOffloaded(broker, topicA, p0, 4, new
KeyValueSpec("k4", "v4"))
+ .expectSegmentToBeOffloaded(broker, topicA, p0, 5, new
KeyValueSpec("k5", "v5"))
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 6L)
+ .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3",
"v3"), new KeyValueSpec("k4", "v4"),
+ new KeyValueSpec("k5", "v5"), new KeyValueSpec("k6",
"v6"))
+ // When reading with transactional consumer, the consecutive
remote fetch indexes are fetched until the
+ // LSO found is higher than the fetch-offset.
+ // summation(n) = (n * (n + 1)) / 2
+ // Total number of uploaded remote segments = 6. Total number
of index fetches = (6 * (6 + 1)) / 2 = 21
+ // Note that we skip the index fetch when the txn-index is
empty, so the effective index fetch count
+ // should be same as the segment count.
+ .expectFetchFromTieredStorage(broker, topicA, p0,
getRemoteFetchCount())
+ .consume(topicA, p0, 0L, 7, 6);
+ }
+
+ private static RemoteFetchCount getRemoteFetchCount() {
+ FetchCountAndOp segmentFetchCountAndOp = new FetchCountAndOp(6,
EQUALS_TO);
+ // RemoteIndexCache might evict the entries much before reaching the
maximum size.
+ // To make the test deterministic, we are using the operation type as
LESS_THAN_OR_EQUALS_TO which equals to the
+ // number of times the RemoteIndexCache gets accessed. The
RemoteIndexCache gets accessed twice for each read.
+ FetchCountAndOp indexFetchCountAndOp = new FetchCountAndOp(12,
LESS_THAN_OR_EQUALS_TO);
+ return new RemoteFetchCount(segmentFetchCountAndOp,
indexFetchCountAndOp,
+ indexFetchCountAndOp, indexFetchCountAndOp);
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchableSpec.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchableSpec.java
index 48dd34052b7..9e2f23af7a7 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchableSpec.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchableSpec.java
@@ -21,10 +21,10 @@ import java.util.Objects;
public final class FetchableSpec {
private final Integer sourceBrokerId;
- private final Integer fetchCount;
+ private final RemoteFetchCount fetchCount;
public FetchableSpec(Integer sourceBrokerId,
- Integer fetchCount) {
+ RemoteFetchCount fetchCount) {
this.sourceBrokerId = sourceBrokerId;
this.fetchCount = fetchCount;
}
@@ -33,7 +33,7 @@ public final class FetchableSpec {
return sourceBrokerId;
}
- public Integer getFetchCount() {
+ public RemoteFetchCount getFetchCount() {
return fetchCount;
}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java
new file mode 100644
index 00000000000..8dfc9a762b8
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.specs;
+
+import java.util.Objects;
+
+public class RemoteFetchCount {
+ private FetchCountAndOp segmentFetchCountAndOp;
+ private FetchCountAndOp offsetIdxFetchCountAndOp = new FetchCountAndOp(-1);
+ private FetchCountAndOp timeIdxFetchCountAndOp = new FetchCountAndOp(-1);
+ private FetchCountAndOp txnIdxFetchCountAndOp = new FetchCountAndOp(-1);
+
+ public RemoteFetchCount(int segmentFetchCountAndOp) {
+ this.segmentFetchCountAndOp = new
FetchCountAndOp(segmentFetchCountAndOp);
+ }
+
+ public RemoteFetchCount(int segmentFetchCountAndOp,
+ int offsetIdxFetchCountAndOp,
+ int timeIdxFetchCountAndOp,
+ int txnIdxFetchCountAndOp) {
+ this.segmentFetchCountAndOp = new
FetchCountAndOp(segmentFetchCountAndOp);
+ this.offsetIdxFetchCountAndOp = new
FetchCountAndOp(offsetIdxFetchCountAndOp);
+ this.timeIdxFetchCountAndOp = new
FetchCountAndOp(timeIdxFetchCountAndOp);
+ this.txnIdxFetchCountAndOp = new
FetchCountAndOp(txnIdxFetchCountAndOp);
+ }
+
+ public RemoteFetchCount(FetchCountAndOp segmentFetchCountAndOp,
+ FetchCountAndOp offsetIdxFetchCountAndOp,
+ FetchCountAndOp timeIdxFetchCountAndOp,
+ FetchCountAndOp txnIdxFetchCountAndOp) {
+ this.segmentFetchCountAndOp = segmentFetchCountAndOp;
+ this.offsetIdxFetchCountAndOp = offsetIdxFetchCountAndOp;
+ this.timeIdxFetchCountAndOp = timeIdxFetchCountAndOp;
+ this.txnIdxFetchCountAndOp = txnIdxFetchCountAndOp;
+ }
+
+ public FetchCountAndOp getSegmentFetchCountAndOp() {
+ return segmentFetchCountAndOp;
+ }
+
+ public void setSegmentFetchCountAndOp(FetchCountAndOp
segmentFetchCountAndOp) {
+ this.segmentFetchCountAndOp = segmentFetchCountAndOp;
+ }
+
+ public FetchCountAndOp getOffsetIdxFetchCountAndOp() {
+ return offsetIdxFetchCountAndOp;
+ }
+
+ public void setOffsetIdxFetchCountAndOp(FetchCountAndOp
offsetIdxFetchCountAndOp) {
+ this.offsetIdxFetchCountAndOp = offsetIdxFetchCountAndOp;
+ }
+
+ public FetchCountAndOp getTimeIdxFetchCountAndOp() {
+ return timeIdxFetchCountAndOp;
+ }
+
+ public void setTimeIdxFetchCountAndOp(FetchCountAndOp
timeIdxFetchCountAndOp) {
+ this.timeIdxFetchCountAndOp = timeIdxFetchCountAndOp;
+ }
+
+ public FetchCountAndOp getTxnIdxFetchCountAndOp() {
+ return txnIdxFetchCountAndOp;
+ }
+
+ public void setTxnIdxFetchCountAndOp(FetchCountAndOp
txnIdxFetchCountAndOp) {
+ this.txnIdxFetchCountAndOp = txnIdxFetchCountAndOp;
+ }
+
+ @Override
+ public String toString() {
+ return "RemoteFetchCount{" +
+ "segmentFetchCountAndOp=" + segmentFetchCountAndOp +
+ ", offsetIdxFetchCountAndOp=" + offsetIdxFetchCountAndOp +
+ ", timeIdxFetchCountAndOp=" + timeIdxFetchCountAndOp +
+ ", txnIdxFetchCountAndOp=" + txnIdxFetchCountAndOp +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RemoteFetchCount that = (RemoteFetchCount) o;
+ return Objects.equals(segmentFetchCountAndOp,
that.segmentFetchCountAndOp) &&
+ Objects.equals(offsetIdxFetchCountAndOp,
that.offsetIdxFetchCountAndOp) &&
+ Objects.equals(timeIdxFetchCountAndOp,
that.timeIdxFetchCountAndOp) &&
+ Objects.equals(txnIdxFetchCountAndOp,
that.txnIdxFetchCountAndOp);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(segmentFetchCountAndOp, offsetIdxFetchCountAndOp,
timeIdxFetchCountAndOp, txnIdxFetchCountAndOp);
+ }
+
+ public enum OperationType {
+ EQUALS_TO,
+ GREATER_THAN_OR_EQUALS_TO,
+ LESS_THAN_OR_EQUALS_TO
+ }
+
+ public static class FetchCountAndOp {
+ private final int count;
+ private final OperationType operationType;
+
+ public FetchCountAndOp(int count) {
+ this.count = count;
+ this.operationType = OperationType.EQUALS_TO;
+ }
+
+ public FetchCountAndOp(int count, OperationType operationType) {
+ this.count = count;
+ this.operationType = operationType;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ public OperationType getOperationType() {
+ return operationType;
+ }
+
+ @Override
+ public String toString() {
+ return "FetchCountAndOp{" +
+ "count=" + count +
+ ", operationType=" + operationType +
+ '}';
+ }
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchSpec.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchSpec.java
index 5a17ebee0cd..823d4321bc7 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchSpec.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchSpec.java
@@ -24,7 +24,7 @@ public final class RemoteFetchSpec {
private final int sourceBrokerId;
private final TopicPartition topicPartition;
- private final int count;
+ private final RemoteFetchCount remoteFetchCount;
/**
* Specifies a fetch (download) event from a second-tier storage. This is
used to ensure the
@@ -32,14 +32,14 @@ public final class RemoteFetchSpec {
*
* @param sourceBrokerId The broker which fetched (a) remote log
segment(s) from the second-tier storage.
* @param topicPartition The topic-partition which segment(s) were fetched.
- * @param count The number of remote log segment(s) fetched.
+ * @param remoteFetchCount The number of remote log segment(s) and indexes
fetched.
*/
public RemoteFetchSpec(int sourceBrokerId,
TopicPartition topicPartition,
- int count) {
+ RemoteFetchCount remoteFetchCount) {
this.sourceBrokerId = sourceBrokerId;
this.topicPartition = topicPartition;
- this.count = count;
+ this.remoteFetchCount = remoteFetchCount;
}
public int getSourceBrokerId() {
@@ -50,14 +50,14 @@ public final class RemoteFetchSpec {
return topicPartition;
}
- public int getCount() {
- return count;
+ public RemoteFetchCount getRemoteFetchCount() {
+ return remoteFetchCount;
}
@Override
public String toString() {
- return String.format("RemoteFetch[source-broker-id=%d
topic-partition=%s count=%d]",
- sourceBrokerId, topicPartition, count);
+ return String.format("RemoteFetch[source-broker-id=%d
topic-partition=%s remote-fetch-count=%s]",
+ sourceBrokerId, topicPartition, remoteFetchCount);
}
@Override
@@ -66,12 +66,12 @@ public final class RemoteFetchSpec {
if (o == null || getClass() != o.getClass()) return false;
RemoteFetchSpec that = (RemoteFetchSpec) o;
return sourceBrokerId == that.sourceBrokerId
- && count == that.count
+ && Objects.equals(remoteFetchCount, that.remoteFetchCount)
&& Objects.equals(topicPartition, that.topicPartition);
}
@Override
public int hashCode() {
- return Objects.hash(sourceBrokerId, topicPartition, count);
+ return Objects.hash(sourceBrokerId, topicPartition, remoteFetchCount);
}
}