This is an automated email from the ASF dual-hosted git repository.

kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b9976437e11 KAFKA-16780: Txn consumer exerts pressure on remote 
storage when collecting aborted txns (#17659)
b9976437e11 is described below

commit b9976437e11dee877f32376d18140641ef4ccbc7
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Fri Nov 8 14:49:09 2024 +0530

    KAFKA-16780: Txn consumer exerts pressure on remote storage when collecting 
aborted txns (#17659)
    
    - KIP-1058 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058:+Txn+consumer+exerts+pressure+on+remote+storage+when+collecting+aborted+transactions)
    - Unit and Integration tests added.
    
    Reviewers: Divij Vaidya <[email protected]>
---
 .../java/kafka/log/remote/RemoteLogManager.java    | 116 ++++++++++++++---
 .../kafka/log/remote/RemoteLogManagerTest.java     |  86 ++++++++++++
 .../remote/storage/RemoteLogMetadataManager.java   |  21 +++
 .../remote/storage/RemoteLogSegmentMetadata.java   |  83 +++++++++++-
 .../storage/NoOpRemoteLogMetadataManager.java      |   5 +
 .../ClassLoaderAwareRemoteLogMetadataManager.java  |   5 +
 .../metadata/storage/RemoteLogMetadataCache.java   |  54 +++++---
 .../storage/RemoteLogSegmentMetadataSnapshot.java  |  51 +++++++-
 .../storage/RemotePartitionMetadataStore.java      |  23 ++--
 .../TopicBasedRemoteLogMetadataManager.java        |  11 ++
 .../RemoteLogSegmentMetadataSnapshotTransform.java |   6 +-
 .../RemoteLogSegmentMetadataTransform.java         |   5 +-
 .../storage/internals/log/TransactionIndex.java    |   8 ++
 .../message/RemoteLogSegmentMetadataRecord.json    |   8 ++
 .../RemoteLogSegmentMetadataSnapshotRecord.json    |   8 ++
 .../storage/RemoteLogMetadataFormatterTest.java    |   4 +-
 ...oteLogSegmentMetadataSnapshotTransformTest.java |  13 +-
 .../internals/log/TransactionIndexTest.java        |  20 +++
 .../tiered/storage/TieredStorageTestBuilder.java   |  14 +-
 .../tiered/storage/TieredStorageTestHarness.java   |   4 +
 .../tiered/storage/actions/ConsumeAction.java      |  75 +++++++++--
 .../OffloadAndTxnConsumeFromLeaderTest.java        | 106 +++++++++++++++
 .../kafka/tiered/storage/specs/FetchableSpec.java  |   6 +-
 .../tiered/storage/specs/RemoteFetchCount.java     | 145 +++++++++++++++++++++
 .../tiered/storage/specs/RemoteFetchSpec.java      |  20 +--
 25 files changed, 805 insertions(+), 92 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 5d2ae4cf2e0..4317f0c5bc3 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -584,13 +584,32 @@ public class RemoteLogManager implements Closeable {
                                                                             
int epochForOffset,
                                                                             
long offset) throws RemoteStorageException {
         Uuid topicId = topicIdByPartitionMap.get(topicPartition);
-
         if (topicId == null) {
             throw new KafkaException("No topic id registered for topic 
partition: " + topicPartition);
         }
         return remoteLogMetadataManager.remoteLogSegmentMetadata(new 
TopicIdPartition(topicId, topicPartition), epochForOffset, offset);
     }
 
+    /**
+     * Returns the next segment that may contain the aborted transaction 
entries. The search ensures that the returned
+     * segment offsets are greater than or equal to the given offset and in 
the same epoch.
+     * @param topicPartition topic partition to search
+     * @param epochForOffset the epoch
+     * @param offset the offset
+     * @return The next segment that contains the transaction index in the 
same epoch.
+     * @throws RemoteStorageException If an error occurs while fetching the 
remote log segment metadata.
+     */
+    public Optional<RemoteLogSegmentMetadata> 
fetchNextSegmentWithTxnIndex(TopicPartition topicPartition,
+                                                                           int 
epochForOffset,
+                                                                           
long offset) throws RemoteStorageException {
+        Uuid topicId = topicIdByPartitionMap.get(topicPartition);
+        if (topicId == null) {
+            throw new KafkaException("No topic id registered for topic 
partition: " + topicPartition);
+        }
+        TopicIdPartition tpId = new TopicIdPartition(topicId, topicPartition);
+        return remoteLogMetadataManager.nextSegmentWithTxnIndex(tpId, 
epochForOffset, offset);
+    }
+
     Optional<FileRecords.TimestampAndOffset> 
lookupTimestamp(RemoteLogSegmentMetadata rlsMetadata, long timestamp, long 
startingOffset)
             throws RemoteStorageException, IOException {
         int startPos = indexCache.lookupTimestamp(rlsMetadata, timestamp, 
startingOffset);
@@ -973,9 +992,10 @@ public class RemoteLogManager implements Closeable {
             Map<Integer, Long> segmentLeaderEpochs = new 
HashMap<>(epochEntries.size());
             epochEntries.forEach(entry -> segmentLeaderEpochs.put(entry.epoch, 
entry.startOffset));
 
+            boolean isTxnIdxEmpty = segment.txnIndex().isEmpty();
             RemoteLogSegmentMetadata copySegmentStartedRlsm = new 
RemoteLogSegmentMetadata(segmentId, segment.baseOffset(), endOffset,
                     segment.largestTimestamp(), brokerId, time.milliseconds(), 
segment.log().sizeInBytes(),
-                    segmentLeaderEpochs);
+                    segmentLeaderEpochs, isTxnIdxEmpty);
 
             
remoteLogMetadataManager.addRemoteLogSegmentMetadata(copySegmentStartedRlsm).get();
 
@@ -1036,7 +1056,8 @@ public class RemoteLogManager implements Closeable {
             // Update the highest offset in remote storage for this 
partition's log so that the local log segments
             // are not deleted before they are copied to remote storage.
             log.updateHighestOffsetInRemoteStorage(endOffset);
-            logger.info("Copied {} to remote storage with segment-id: {}", 
logFileName, copySegmentFinishedRlsm.remoteLogSegmentId());
+            logger.info("Copied {} to remote storage with segment-id: {}",
+                    logFileName, copySegmentFinishedRlsm.remoteLogSegmentId());
 
             long bytesLag = log.onlyLocalLogSegmentsSize() - 
log.activeSegment().size();
             long segmentsLag = log.onlyLocalLogSegmentsCount() - 1;
@@ -1740,7 +1761,10 @@ public class RemoteLogManager implements Closeable {
                 abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
                         
.map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
 
+        long startTimeNs = time.nanoseconds();
         collectAbortedTransactions(startOffset, upperBoundOffset, 
segmentMetadata, accumulator, log);
+        LOGGER.debug("Time taken to collect: {} aborted transactions for {} in 
{} ns", abortedTransactions.size(),
+                segmentMetadata, time.nanoseconds() - startTimeNs);
 
         return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
                 fetchInfo.records,
@@ -1748,29 +1772,51 @@ public class RemoteLogManager implements Closeable {
                 Optional.of(abortedTransactions.isEmpty() ? 
Collections.emptyList() : new ArrayList<>(abortedTransactions)));
     }
 
+    /**
+     * Collects the aborted transaction entries from the current and 
subsequent segments until the upper bound offset.
+     * Note that the accumulated aborted transaction entries might contain 
duplicates as it collects the entries across
+     * segments. We are relying on the client to discard the duplicates.
+     * @param startOffset The start offset of the fetch request.
+     * @param upperBoundOffset The upper bound offset of the fetch request.
+     * @param segmentMetadata The current segment metadata.
+     * @param accumulator The accumulator to collect the aborted transactions.
+     * @param log The unified log instance.
+     * @throws RemoteStorageException If an error occurs while fetching the 
remote log segment metadata.
+     */
     private void collectAbortedTransactions(long startOffset,
                                             long upperBoundOffset,
                                             RemoteLogSegmentMetadata 
segmentMetadata,
                                             Consumer<List<AbortedTxn>> 
accumulator,
                                             UnifiedLog log) throws 
RemoteStorageException {
-        // Search in remote segments first.
-        Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = 
Optional.of(segmentMetadata);
-        while (nextSegmentMetadataOpt.isPresent()) {
-            Optional<TransactionIndex> txnIndexOpt = 
nextSegmentMetadataOpt.map(metadata -> 
indexCache.getIndexEntry(metadata).txnIndex());
+        TopicPartition tp = 
segmentMetadata.topicIdPartition().topicPartition();
+        boolean isSearchComplete = false;
+        LeaderEpochFileCache leaderEpochCache = 
log.leaderEpochCache().getOrElse(null);
+        Optional<RemoteLogSegmentMetadata> currentMetadataOpt = 
Optional.of(segmentMetadata);
+        while (!isSearchComplete && currentMetadataOpt.isPresent()) {
+            RemoteLogSegmentMetadata currentMetadata = 
currentMetadataOpt.get();
+            Optional<TransactionIndex> txnIndexOpt = 
getTransactionIndex(currentMetadata);
             if (txnIndexOpt.isPresent()) {
-                TxnIndexSearchResult searchResult = 
txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
+                TransactionIndex txnIndex = txnIndexOpt.get();
+                TxnIndexSearchResult searchResult = 
txnIndex.collectAbortedTxns(startOffset, upperBoundOffset);
                 accumulator.accept(searchResult.abortedTransactions);
-                if (searchResult.isComplete) {
-                    // Return immediately when the search result is complete, 
it does not need to go through local log segments.
-                    return;
-                }
+                isSearchComplete = searchResult.isComplete;
+            }
+            if (!isSearchComplete) {
+                currentMetadataOpt = findNextSegmentWithTxnIndex(tp, 
currentMetadata.endOffset() + 1, leaderEpochCache);
             }
-
-            nextSegmentMetadataOpt = 
findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log.leaderEpochCache());
         }
-
         // Search in local segments
-        collectAbortedTransactionInLocalSegments(startOffset, 
upperBoundOffset, accumulator, log.logSegments().iterator());
+        if (!isSearchComplete) {
+            collectAbortedTransactionInLocalSegments(startOffset, 
upperBoundOffset, accumulator, log.logSegments().iterator());
+        }
+    }
+
+    private Optional<TransactionIndex> 
getTransactionIndex(RemoteLogSegmentMetadata currentMetadata) {
+        return !currentMetadata.isTxnIdxEmpty() ?
+                // `ofNullable` is needed for backward compatibility for old 
events that were stored in the
+                // `__remote_log_metadata` topic. The old events will return 
the `txnIdxEmpty` as false, but the
+                // transaction index may not exist in the remote storage.
+                
Optional.ofNullable(indexCache.getIndexEntry(currentMetadata).txnIndex()) : 
Optional.empty();
     }
 
     private void collectAbortedTransactionInLocalSegments(long startOffset,
@@ -1803,6 +1849,44 @@ public class RemoteLogManager implements Closeable {
                 : Optional.empty();
     }
 
+    /**
+     * Returns the next segment metadata that contains the aborted transaction 
entries from the given offset.
+     * Note that the search starts from the given (offset-for-epoch, offset) 
pair, when there are no segments contains
+     * the transaction index in that epoch, then it proceeds to the next epoch 
(next-epoch, epoch-start-offset)
+     * and the search ends when the segment metadata is found or the leader 
epoch cache is exhausted.
+     * Note that the returned segment metadata may or may not contain the 
transaction index.
+     * Visible for testing
+     * @param tp The topic partition.
+     * @param offset The offset to start the search.
+     * @param leaderEpochCache The leader epoch file cache, this could be null.
+     * @return The next segment metadata that contains the transaction index. 
The transaction index may or may not exist
+     * in that segment metadata which depends on the RLMM plugin 
implementation. The caller of this method should handle
+     * for both the cases.
+     * @throws RemoteStorageException If an error occurs while fetching the 
remote log segment metadata.
+     */
+    Optional<RemoteLogSegmentMetadata> 
findNextSegmentWithTxnIndex(TopicPartition tp,
+                                                                   long offset,
+                                                                   
LeaderEpochFileCache leaderEpochCache) throws RemoteStorageException {
+        if (leaderEpochCache == null) {
+            return Optional.empty();
+        }
+        OptionalInt initialEpochOpt = leaderEpochCache.epochForOffset(offset);
+        if (initialEpochOpt.isEmpty()) {
+            return Optional.empty();
+        }
+        int initialEpoch = initialEpochOpt.getAsInt();
+        for (EpochEntry epochEntry : leaderEpochCache.epochEntries()) {
+            if (epochEntry.epoch >= initialEpoch) {
+                long startOffset = Math.max(epochEntry.startOffset, offset);
+                Optional<RemoteLogSegmentMetadata> metadataOpt = 
fetchNextSegmentWithTxnIndex(tp, epochEntry.epoch, startOffset);
+                if (metadataOpt.isPresent()) {
+                    return metadataOpt;
+                }
+            }
+        }
+        return Optional.empty();
+    }
+
     // Visible for testing
     RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long 
offset) throws IOException {
         RecordBatch nextBatch;
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java 
b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 4ea373327d9..e75a6ca85d4 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -1414,6 +1414,92 @@ public class RemoteLogManagerTest {
             .remoteLogSegmentMetadata(eq(followerTopicIdPartition), anyInt(), 
anyLong());
     }
 
+    @Test
+    public void testFetchNextSegmentWithTxnIndex() throws 
RemoteStorageException {
+        remoteLogManager.startup();
+        remoteLogManager.onLeadershipChange(
+            Collections.singleton(mockPartition(leaderTopicIdPartition)), 
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
+        
remoteLogManager.fetchNextSegmentWithTxnIndex(leaderTopicIdPartition.topicPartition(),
 10, 100L);
+        
remoteLogManager.fetchNextSegmentWithTxnIndex(followerTopicIdPartition.topicPartition(),
 20, 200L);
+
+        verify(remoteLogMetadataManager)
+            .nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), anyInt(), 
anyLong());
+        verify(remoteLogMetadataManager)
+                .nextSegmentWithTxnIndex(eq(followerTopicIdPartition), 
anyInt(), anyLong());
+    }
+
+    @Test
+    public void testFindNextSegmentWithTxnIndex() throws 
RemoteStorageException {
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, 
scheduler);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt()))
+                .thenReturn(Optional.of(0L));
+        
when(remoteLogMetadataManager.nextSegmentWithTxnIndex(any(TopicIdPartition.class),
 anyInt(), anyLong()))
+                .thenAnswer(ans -> {
+                    TopicIdPartition topicIdPartition = ans.getArgument(0);
+                    int leaderEpoch = ans.getArgument(1);
+                    long offset = ans.getArgument(2);
+                    RemoteLogSegmentId segmentId = new 
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
+                    Map<Integer, Long> leaderEpochs = new TreeMap<>();
+                    leaderEpochs.put(leaderEpoch, offset);
+                    RemoteLogSegmentMetadata metadata = new 
RemoteLogSegmentMetadata(segmentId,
+                            offset, offset + 100, time.milliseconds(), 0, 
time.milliseconds(), 1024, leaderEpochs, true);
+                    return Optional.of(metadata);
+                });
+
+        remoteLogManager.startup();
+        remoteLogManager.onLeadershipChange(
+                Collections.singleton(mockPartition(leaderTopicIdPartition)), 
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
+
+        // For offset-10, epoch is 0.
+        
remoteLogManager.findNextSegmentWithTxnIndex(leaderTopicIdPartition.topicPartition(),
 10, cache);
+        verify(remoteLogMetadataManager)
+                .nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), eq(0), 
eq(10L));
+    }
+
+    @Test
+    public void testFindNextSegmentWithTxnIndexTraversesNextEpoch() throws 
RemoteStorageException {
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint, 
scheduler);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+
+        
when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class),
 anyInt()))
+                .thenReturn(Optional.of(0L));
+        
when(remoteLogMetadataManager.nextSegmentWithTxnIndex(any(TopicIdPartition.class),
 anyInt(), anyLong()))
+                .thenAnswer(ans -> {
+                    TopicIdPartition topicIdPartition = ans.getArgument(0);
+                    int leaderEpoch = ans.getArgument(1);
+                    long offset = ans.getArgument(2);
+                    Optional<RemoteLogSegmentMetadata> metadataOpt = 
Optional.empty();
+                    if (leaderEpoch == 2) {
+                        RemoteLogSegmentId segmentId = new 
RemoteLogSegmentId(topicIdPartition, Uuid.randomUuid());
+                        Map<Integer, Long> leaderEpochs = new TreeMap<>();
+                        leaderEpochs.put(leaderEpoch, offset);
+                        RemoteLogSegmentMetadata metadata = new 
RemoteLogSegmentMetadata(segmentId,
+                                offset, offset + 100, time.milliseconds(), 0, 
time.milliseconds(), 1024, leaderEpochs, true);
+                        metadataOpt = Optional.of(metadata);
+                    }
+                    return metadataOpt;
+                });
+
+        remoteLogManager.startup();
+        remoteLogManager.onLeadershipChange(
+                Collections.singleton(mockPartition(leaderTopicIdPartition)), 
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
+
+        // For offset-10, epoch is 0.
+        //  1. For epoch 0 and 1, it returns empty and
+        //  2. For epoch 2, it returns the segment metadata.
+        
remoteLogManager.findNextSegmentWithTxnIndex(leaderTopicIdPartition.topicPartition(),
 10, cache);
+        verify(remoteLogMetadataManager)
+            .nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), eq(0), 
eq(10L));
+        verify(remoteLogMetadataManager)
+                .nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), eq(1), 
eq(100L));
+        verify(remoteLogMetadataManager)
+                .nextSegmentWithTxnIndex(eq(leaderTopicIdPartition), eq(2), 
eq(200L));
+    }
+
     @Test
     void testOnLeadershipChangeWillInvokeHandleLeaderOrFollowerPartitions() {
         remoteLogManager.startup();
diff --git 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
index 78d48fbd949..2280aa51132 100644
--- 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
+++ 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
@@ -209,4 +209,25 @@ public interface RemoteLogMetadataManager extends 
Configurable, Closeable {
      * @return Total size of the log stored in remote storage in bytes.
      */
     long remoteLogSize(TopicIdPartition topicIdPartition, int leaderEpoch) 
throws RemoteStorageException;
+
+    /**
+     * Returns the next segment metadata that contains the aborted transaction 
entries for the given topic partition, epoch and offset.
+     * <ul>
+     *     <li>The default implementation returns the segment metadata that 
matches the given epoch and offset
+     *     irrespective of the presence of the transaction index.</li>
+     *     <li>The custom implementation can optimize by returning the next 
segment metadata that contains the txn index
+     *     in the given epoch. If there are no segments with txn index in the 
given epoch, then return empty.</li>
+     * </ul>
+     * @param topicIdPartition topic partition to search for.
+     * @param epoch leader epoch for the given offset.
+     * @param offset offset
+     * @return The next segment metadata. The transaction index may or may not 
exist in the returned segment metadata
+     * which depends on the RLMM plugin implementation. The caller of this 
method handles for both the cases.
+     * @throws RemoteStorageException if there are any storage related errors 
occurred.
+     */
+    default Optional<RemoteLogSegmentMetadata> 
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition,
+                                                                       int 
epoch,
+                                                                       long 
offset) throws RemoteStorageException {
+        return remoteLogSegmentMetadata(topicIdPartition, epoch, offset);
+    }
 }
\ No newline at end of file
diff --git 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
index 9b589322bbf..02918d90625 100644
--- 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
+++ 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
@@ -78,6 +78,11 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
      */
     private final RemoteLogSegmentState state;
 
+    /**
+     * Indicates whether the transaction index is empty for this segment.
+     */
+    private final boolean txnIdxEmpty;
+
     /**
      * Creates an instance with the given metadata of remote log segment.
      * <p>
@@ -105,6 +110,39 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
                                     Optional<CustomMetadata> customMetadata,
                                     RemoteLogSegmentState state,
                                     Map<Integer, Long> segmentLeaderEpochs) {
+        this(remoteLogSegmentId, startOffset, endOffset, maxTimestampMs, 
brokerId, eventTimestampMs, segmentSizeInBytes,
+                customMetadata, state, segmentLeaderEpochs, false);
+    }
+
+    /**
+     * Creates an instance with the given metadata of remote log segment.
+     * <p>
+     * {@code segmentLeaderEpochs} can not be empty. If all the records in 
this segment belong to the same leader epoch
+     * then it should have an entry with epoch mapping to start-offset of this 
segment.
+     *
+     * @param remoteLogSegmentId  Universally unique remote log segment id.
+     * @param startOffset         Start offset of this segment (inclusive).
+     * @param endOffset           End offset of this segment (inclusive).
+     * @param maxTimestampMs      Maximum timestamp in milli seconds in this 
segment.
+     * @param brokerId            Broker id from which this event is generated.
+     * @param eventTimestampMs    Epoch time in milli seconds at which the 
remote log segment is copied to the remote tier storage.
+     * @param segmentSizeInBytes  Size of this segment in bytes.
+     * @param customMetadata      Custom metadata.
+     * @param state               State of the respective segment of 
remoteLogSegmentId.
+     * @param segmentLeaderEpochs leader epochs occurred within this segment.
+     * @param txnIdxEmpty         True if the transaction index is empty, 
false otherwise.
+     */
+    public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
+                                    long startOffset,
+                                    long endOffset,
+                                    long maxTimestampMs,
+                                    int brokerId,
+                                    long eventTimestampMs,
+                                    int segmentSizeInBytes,
+                                    Optional<CustomMetadata> customMetadata,
+                                    RemoteLogSegmentState state,
+                                    Map<Integer, Long> segmentLeaderEpochs,
+                                    boolean txnIdxEmpty) {
         super(brokerId, eventTimestampMs);
         this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId, 
"remoteLogSegmentId can not be null");
         this.state = Objects.requireNonNull(state, "state can not be null");
@@ -128,6 +166,7 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
         }
 
         this.segmentLeaderEpochs = Collections.unmodifiableNavigableMap(new 
TreeMap<>(segmentLeaderEpochs));
+        this.txnIdxEmpty = txnIdxEmpty;
     }
 
     /**
@@ -164,6 +203,34 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
                 segmentLeaderEpochs);
     }
 
+    /**
+     * Creates an instance with the given metadata of remote log segment and 
its state as {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}.
+     * <p>
+     * {@code segmentLeaderEpochs} can not be empty. If all the records in 
this segment belong to the same leader epoch
+     * then it should have an entry with epoch mapping to start-offset of this 
segment.
+     *
+     * @param remoteLogSegmentId  Universally unique remote log segment id.
+     * @param startOffset         Start offset of this segment (inclusive).
+     * @param endOffset           End offset of this segment (inclusive).
+     * @param maxTimestampMs      Maximum timestamp in this segment
+     * @param brokerId            Broker id from which this event is generated.
+     * @param eventTimestampMs    Epoch time in milli seconds at which the 
remote log segment is copied to the remote tier storage.
+     * @param segmentSizeInBytes  Size of this segment in bytes.
+     * @param segmentLeaderEpochs leader epochs occurred within this segment
+     * @param txnIdxEmpty         True if the transaction index is empty, 
false otherwise.
+     */
+    public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
+                                    long startOffset,
+                                    long endOffset,
+                                    long maxTimestampMs,
+                                    int brokerId,
+                                    long eventTimestampMs,
+                                    int segmentSizeInBytes,
+                                    Map<Integer, Long> segmentLeaderEpochs,
+                                    boolean txnIdxEmpty) {
+        this(remoteLogSegmentId, startOffset, endOffset, maxTimestampMs, 
brokerId, eventTimestampMs, segmentSizeInBytes,
+                Optional.empty(), RemoteLogSegmentState.COPY_SEGMENT_STARTED, 
segmentLeaderEpochs, txnIdxEmpty);
+    }
 
     /**
      * @return unique id of this segment.
@@ -227,6 +294,14 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
         return state;
     }
 
+    /**
+     * If true indicates that the transaction index is empty.
+     * @return True if the Transaction index is empty, false otherwise.
+     */
+    public boolean isTxnIdxEmpty() {
+        return txnIdxEmpty;
+    }
+
     /**
      * Creates a new RemoteLogSegmentMetadata applying the given {@code 
rlsmUpdate} on this instance. This method will
      * not update this instance.
@@ -241,7 +316,7 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
 
         return new RemoteLogSegmentMetadata(remoteLogSegmentId, startOffset,
                 endOffset, maxTimestampMs, rlsmUpdate.brokerId(), 
rlsmUpdate.eventTimestampMs(),
-                segmentSizeInBytes, rlsmUpdate.customMetadata(), 
rlsmUpdate.state(), segmentLeaderEpochs);
+                segmentSizeInBytes, rlsmUpdate.customMetadata(), 
rlsmUpdate.state(), segmentLeaderEpochs, txnIdxEmpty);
     }
 
     @Override
@@ -266,13 +341,14 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
                 && Objects.equals(customMetadata, that.customMetadata)
                 && state == that.state
                 && eventTimestampMs() == that.eventTimestampMs()
-                && brokerId() == that.brokerId();
+                && brokerId() == that.brokerId()
+                && txnIdxEmpty == that.txnIdxEmpty;
     }
 
     @Override
     public int hashCode() {
         return Objects.hash(remoteLogSegmentId, startOffset, endOffset, 
brokerId(), maxTimestampMs,
-                eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes, 
customMetadata, state);
+                eventTimestampMs(), segmentLeaderEpochs, segmentSizeInBytes, 
customMetadata, state, txnIdxEmpty);
     }
 
     @Override
@@ -288,6 +364,7 @@ public class RemoteLogSegmentMetadata extends 
RemoteLogMetadata {
                ", segmentSizeInBytes=" + segmentSizeInBytes +
                ", customMetadata=" + customMetadata +
                ", state=" + state +
+               ", txnIdxEmpty=" + txnIdxEmpty +
                '}';
     }
 
diff --git 
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
 
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
index 6ffcf85b9b0..cb9491360f8 100644
--- 
a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
+++ 
b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
@@ -79,6 +79,11 @@ public class NoOpRemoteLogMetadataManager implements 
RemoteLogMetadataManager {
         return 0;
     }
 
+    @Override
+    public Optional<RemoteLogSegmentMetadata> 
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long 
offset) {
+        return Optional.empty();
+    }
+
     @Override
     public void close() throws IOException {
     }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
index dc2678242f4..1abcbbc20ce 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
@@ -106,6 +106,11 @@ public class ClassLoaderAwareRemoteLogMetadataManager 
implements RemoteLogMetada
         return withClassLoader(() -> delegate.remoteLogSize(topicIdPartition, 
leaderEpoch));
     }
 
+    @Override
+    public Optional<RemoteLogSegmentMetadata> 
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long 
offset) throws RemoteStorageException {
+        return withClassLoader(() -> 
delegate.nextSegmentWithTxnIndex(topicIdPartition, epoch, offset));
+    }
+
     @Override
     public void configure(Map<String, ?> configs) {
         withClassLoader(() -> {
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
index 2ee1c29f6e5..b08458b84b8 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
@@ -125,29 +125,45 @@ public class RemoteLogMetadataCache {
      * @return the requested remote log segment metadata if it exists.
      */
     public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int 
leaderEpoch, long offset) {
-        RemoteLogLeaderEpochState remoteLogLeaderEpochState = 
leaderEpochEntries.get(leaderEpoch);
-
-        if (remoteLogLeaderEpochState == null) {
-            return Optional.empty();
+        RemoteLogSegmentMetadata metadata = getSegmentMetadata(leaderEpoch, 
offset);
+        long epochEndOffset = -1L;
+        if (metadata != null) {
+            // Check whether the given offset with leaderEpoch exists in this 
segment.
+            // Check for epoch's offset boundaries with in this segment.
+            //   1. Get the next epoch's start offset -1 if exists
+            //   2. If no next epoch exists, then segment end offset can be 
considered as epoch's relative end offset.
+            Map.Entry<Integer, Long> nextEntry = 
metadata.segmentLeaderEpochs().higherEntry(leaderEpoch);
+            epochEndOffset = (nextEntry != null) ? nextEntry.getValue() - 1 : 
metadata.endOffset();
         }
+        // Return empty when target offset > epoch's end offset.
+        return offset > epochEndOffset ? Optional.empty() : 
Optional.ofNullable(metadata);
+    }
 
-        // Look for floor entry as the given offset may exist in this entry.
-        RemoteLogSegmentId remoteLogSegmentId = 
remoteLogLeaderEpochState.floorEntry(offset);
-        if (remoteLogSegmentId == null) {
-            // If the offset is lower than the minimum offset available in 
metadata then return empty.
-            return Optional.empty();
+    public Optional<RemoteLogSegmentMetadata> nextSegmentWithTxnIndex(int 
leaderEpoch, long offset) {
+        boolean txnIdxEmpty = true;
+        Optional<RemoteLogSegmentMetadata> metadataOpt = 
remoteLogSegmentMetadata(leaderEpoch, offset);
+        while (metadataOpt.isPresent() && txnIdxEmpty) {
+            txnIdxEmpty = metadataOpt.get().isTxnIdxEmpty();
+            if (txnIdxEmpty) { // If txn index is empty, then look for next 
segment.
+                metadataOpt = remoteLogSegmentMetadata(leaderEpoch, 
metadataOpt.get().endOffset() + 1);
+            }
         }
+        return txnIdxEmpty ? Optional.empty() : metadataOpt;
+    }
 
-        RemoteLogSegmentMetadata metadata = 
idToSegmentMetadata.get(remoteLogSegmentId);
-        // Check whether the given offset with leaderEpoch exists in this 
segment.
-        // Check for epoch's offset boundaries with in this segment.
-        //      1. Get the next epoch's start offset -1 if exists
-        //      2. If no next epoch exists, then segment end offset can be 
considered as epoch's relative end offset.
-        Map.Entry<Integer, Long> nextEntry = 
metadata.segmentLeaderEpochs().higherEntry(leaderEpoch);
-        long epochEndOffset = (nextEntry != null) ? nextEntry.getValue() - 1 : 
metadata.endOffset();
-
-        // Return empty when target offset > epoch's end offset.
-        return offset > epochEndOffset ? Optional.empty() : 
Optional.of(metadata);
+    private RemoteLogSegmentMetadata getSegmentMetadata(int leaderEpoch, long 
offset) {
+        RemoteLogLeaderEpochState remoteLogLeaderEpochState = 
leaderEpochEntries.get(leaderEpoch);
+        if (remoteLogLeaderEpochState != null) {
+            // Look for floor entry as the given offset may exist in this 
entry.
+            RemoteLogSegmentId remoteLogSegmentId = 
remoteLogLeaderEpochState.floorEntry(offset);
+            if (remoteLogSegmentId != null) {
+                return idToSegmentMetadata.get(remoteLogSegmentId);
+            } else {
+                log.warn("No remote segment found for leaderEpoch: {}, offset: 
{}", leaderEpoch, offset);
+            }
+        }
+        // If the offset is lower than the minimum offset available in 
metadata then return null.
+        return null;
     }
 
     public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate 
metadataUpdate)
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
index a1ba23dd8f8..64540a7fabd 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
@@ -78,6 +78,11 @@ public class RemoteLogSegmentMetadataSnapshot extends 
RemoteLogMetadata {
      */
     private final RemoteLogSegmentState state;
 
+    /**
+     * Indicates whether the transaction index is empty for this segment.
+     */
+    private final boolean txnIdxEmpty;
+
     /**
      * Creates an instance with the given metadata of remote log segment.
      * <p>
@@ -105,6 +110,39 @@ public class RemoteLogSegmentMetadataSnapshot extends 
RemoteLogMetadata {
                                             Optional<CustomMetadata> 
customMetadata,
                                             RemoteLogSegmentState state,
                                             Map<Integer, Long> 
segmentLeaderEpochs) {
+        this(segmentId, startOffset, endOffset, maxTimestampMs, brokerId, 
eventTimestampMs, segmentSizeInBytes,
+                customMetadata, state, segmentLeaderEpochs, false);
+    }
+
+    /**
+     * Creates an instance with the given metadata of remote log segment.
+     * <p>
+     * {@code segmentLeaderEpochs} can not be empty. If all the records in 
this segment belong to the same leader epoch
+     * then it should have an entry with epoch mapping to start-offset of this 
segment.
+     *
+     * @param segmentId           Universally unique remote log segment id.
+     * @param startOffset         Start offset of this segment (inclusive).
+     * @param endOffset           End offset of this segment (inclusive).
+     * @param maxTimestampMs      Maximum timestamp in milliseconds in this 
segment.
+     * @param brokerId            Broker id from which this event is generated.
+     * @param eventTimestampMs    Epoch time in milliseconds at which the 
remote log segment is copied to the remote tier storage.
+     * @param segmentSizeInBytes  Size of this segment in bytes.
+     * @param customMetadata      Custom metadata.
+     * @param state               State of the respective segment of 
remoteLogSegmentId.
+     * @param segmentLeaderEpochs leader epochs occurred within this segment.
+     * @param txnIdxEmpty         true if the transaction index is empty, 
false otherwise.
+     */
+    public RemoteLogSegmentMetadataSnapshot(Uuid segmentId,
+                                            long startOffset,
+                                            long endOffset,
+                                            long maxTimestampMs,
+                                            int brokerId,
+                                            long eventTimestampMs,
+                                            int segmentSizeInBytes,
+                                            Optional<CustomMetadata> 
customMetadata,
+                                            RemoteLogSegmentState state,
+                                            Map<Integer, Long> 
segmentLeaderEpochs,
+                                            boolean txnIdxEmpty) {
         super(brokerId, eventTimestampMs);
         this.segmentId = Objects.requireNonNull(segmentId, "remoteLogSegmentId 
can not be null");
         this.state = Objects.requireNonNull(state, "state can not be null");
@@ -114,6 +152,7 @@ public class RemoteLogSegmentMetadataSnapshot extends 
RemoteLogMetadata {
         this.maxTimestampMs = maxTimestampMs;
         this.segmentSizeInBytes = segmentSizeInBytes;
         this.customMetadata = Objects.requireNonNull(customMetadata, 
"customMetadata can not be null");
+        this.txnIdxEmpty = txnIdxEmpty;
 
         if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) {
             throw new IllegalArgumentException("segmentLeaderEpochs can not be 
null or empty");
@@ -125,7 +164,7 @@ public class RemoteLogSegmentMetadataSnapshot extends 
RemoteLogMetadata {
     public static RemoteLogSegmentMetadataSnapshot 
create(RemoteLogSegmentMetadata metadata) {
         return new 
RemoteLogSegmentMetadataSnapshot(metadata.remoteLogSegmentId().id(), 
metadata.startOffset(), metadata.endOffset(),
                                                     metadata.maxTimestampMs(), 
metadata.brokerId(), metadata.eventTimestampMs(),
-                                                    
metadata.segmentSizeInBytes(), metadata.customMetadata(), metadata.state(), 
metadata.segmentLeaderEpochs()
+                                                    
metadata.segmentSizeInBytes(), metadata.customMetadata(), metadata.state(), 
metadata.segmentLeaderEpochs(), metadata.isTxnIdxEmpty()
         );
     }
 
@@ -191,6 +230,10 @@ public class RemoteLogSegmentMetadataSnapshot extends 
RemoteLogMetadata {
         return state;
     }
 
+    public boolean isTxnIdxEmpty() {
+        return txnIdxEmpty;
+    }
+
     @Override
     public TopicIdPartition topicIdPartition() {
         throw new UnsupportedOperationException("This metadata does not have 
topic partition with it.");
@@ -208,12 +251,13 @@ public class RemoteLogSegmentMetadataSnapshot extends 
RemoteLogMetadata {
                 && Objects.equals(customMetadata, that.customMetadata)
                 && Objects.equals(segmentId, that.segmentId)
                 && Objects.equals(segmentLeaderEpochs, 
that.segmentLeaderEpochs)
-                && state == that.state;
+                && state == that.state
+                && txnIdxEmpty == that.txnIdxEmpty;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs, 
segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state);
+        return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs, 
segmentLeaderEpochs, segmentSizeInBytes, customMetadata, state, txnIdxEmpty);
     }
 
     @Override
@@ -227,6 +271,7 @@ public class RemoteLogSegmentMetadataSnapshot extends 
RemoteLogMetadata {
                 ", segmentSizeInBytes=" + segmentSizeInBytes +
                 ", customMetadata=" + customMetadata +
                 ", state=" + state +
+                ", txnIdxEmpty=" + txnIdxEmpty +
                 '}';
     }
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
index 86c29567d36..c74e97fbe68 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
@@ -55,7 +55,7 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
 
     @Override
     public void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata) {
-        log.debug("Adding remote log segment : [{}]", 
remoteLogSegmentMetadata);
+        log.debug("Adding remote log segment: {}", remoteLogSegmentMetadata);
 
         final RemoteLogSegmentId remoteLogSegmentId = 
remoteLogSegmentMetadata.remoteLogSegmentId();
         TopicIdPartition topicIdPartition = 
remoteLogSegmentId.topicIdPartition();
@@ -71,7 +71,7 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
 
     @Override
     public void 
handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate rlsmUpdate) 
{
-        log.debug("Updating remote log segment: [{}]", rlsmUpdate);
+        log.debug("Updating remote log segment: {}", rlsmUpdate);
         RemoteLogSegmentId remoteLogSegmentId = 
rlsmUpdate.remoteLogSegmentId();
         TopicIdPartition topicIdPartition = 
remoteLogSegmentId.topicIdPartition();
         RemoteLogMetadataCache remoteLogMetadataCache = 
idToRemoteLogMetadataCache.get(topicIdPartition);
@@ -88,7 +88,7 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
 
     @Override
     public void 
handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata) {
-        log.debug("Received partition delete state with: [{}]", 
remotePartitionDeleteMetadata);
+        log.debug("Received partition delete state with: {}", 
remotePartitionDeleteMetadata);
 
         TopicIdPartition topicIdPartition = 
remotePartitionDeleteMetadata.topicIdPartition();
         idToPartitionDeleteMetadata.put(topicIdPartition, 
remotePartitionDeleteMetadata);
@@ -108,30 +108,25 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
 
     public Iterator<RemoteLogSegmentMetadata> 
listRemoteLogSegments(TopicIdPartition topicIdPartition)
             throws RemoteStorageException {
-        Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be 
null");
-
         return 
getRemoteLogMetadataCache(topicIdPartition).listAllRemoteLogSegments();
     }
 
     public Iterator<RemoteLogSegmentMetadata> 
listRemoteLogSegments(TopicIdPartition topicIdPartition, int leaderEpoch)
             throws RemoteStorageException {
-        Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be 
null");
-
         return 
getRemoteLogMetadataCache(topicIdPartition).listRemoteLogSegments(leaderEpoch);
     }
 
     private RemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition 
topicIdPartition)
             throws RemoteResourceNotFoundException {
+        Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be 
null");
         RemoteLogMetadataCache remoteLogMetadataCache = 
idToRemoteLogMetadataCache.get(topicIdPartition);
         if (remoteLogMetadataCache == null) {
             throw new RemoteResourceNotFoundException("No resource found for 
partition: " + topicIdPartition);
         }
-
         if (!remoteLogMetadataCache.isInitialized()) {
             // Throwing a retriable ReplicaNotAvailableException here for 
clients retry.
             throw new ReplicaNotAvailableException("Remote log metadata cache 
is not initialized for partition: " + topicIdPartition);
         }
-
         return remoteLogMetadataCache;
     }
 
@@ -139,15 +134,17 @@ public class RemotePartitionMetadataStore extends 
RemotePartitionMetadataEventHa
                                                                        long 
offset,
                                                                        int 
epochForOffset)
             throws RemoteStorageException {
-        Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be 
null");
-
         return 
getRemoteLogMetadataCache(topicIdPartition).remoteLogSegmentMetadata(epochForOffset,
 offset);
     }
 
+    public Optional<RemoteLogSegmentMetadata> 
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition,
+                                                                      int 
epoch,
+                                                                      long 
offset) throws RemoteStorageException {
+        return 
getRemoteLogMetadataCache(topicIdPartition).nextSegmentWithTxnIndex(epoch, 
offset);
+    }
+
     public Optional<Long> highestLogOffset(TopicIdPartition topicIdPartition,
                                            int leaderEpoch) throws 
RemoteStorageException {
-        Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be 
null");
-
         return 
getRemoteLogMetadataCache(topicIdPartition).highestOffsetForEpoch(leaderEpoch);
     }
 
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index eff498b5437..a5db1ea38ef 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -356,6 +356,17 @@ public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataMana
         return remoteLogSize;
     }
 
+    @Override
+    public Optional<RemoteLogSegmentMetadata> 
nextSegmentWithTxnIndex(TopicIdPartition topicIdPartition, int epoch, long 
offset) throws RemoteStorageException {
+        lock.readLock().lock();
+        try {
+            ensureInitializedAndNotClosed();
+            return 
remotePartitionMetadataStore.nextSegmentWithTxnIndex(topicIdPartition, epoch, 
offset);
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
     @Override
     public void configure(Map<String, ?> configs) {
         Objects.requireNonNull(configs, "configs can not be null.");
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
index ad47ee05c84..4e839d08b3a 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
@@ -40,7 +40,8 @@ public class RemoteLogSegmentMetadataSnapshotTransform 
implements RemoteLogMetad
                 .setMaxTimestampMs(segmentMetadata.maxTimestampMs())
                 .setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
                 
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata.segmentLeaderEpochs()))
-                .setRemoteLogSegmentState(segmentMetadata.state().id());
+                .setRemoteLogSegmentState(segmentMetadata.state().id())
+                .setTxnIndexEmpty(segmentMetadata.isTxnIdxEmpty());
         segmentMetadata.customMetadata().ifPresent(md -> 
record.setCustomMetadata(md.value()));
 
         return new ApiMessageAndVersion(record, 
record.highestSupportedVersion());
@@ -72,7 +73,8 @@ public class RemoteLogSegmentMetadataSnapshotTransform 
implements RemoteLogMetad
                                                     
record.segmentSizeInBytes(),
                                                     customMetadata,
                                                     
RemoteLogSegmentState.forId(record.remoteLogSegmentState()),
-                                                    segmentLeaderEpochs);
+                                                    segmentLeaderEpochs,
+                                                    record.txnIndexEmpty());
     }
 
 }
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
index 9e893d2cbc3..99f3fc0d90c 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
@@ -44,7 +44,8 @@ public class RemoteLogSegmentMetadataTransform implements 
RemoteLogMetadataTrans
                 .setMaxTimestampMs(segmentMetadata.maxTimestampMs())
                 .setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
                 
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata))
-                .setRemoteLogSegmentState(segmentMetadata.state().id());
+                .setRemoteLogSegmentState(segmentMetadata.state().id())
+                .setTxnIndexEmpty(segmentMetadata.isTxnIdxEmpty());
         segmentMetadata.customMetadata().ifPresent(md -> 
record.setCustomMetadata(md.value()));
 
         return new ApiMessageAndVersion(record, 
record.highestSupportedVersion());
@@ -83,7 +84,7 @@ public class RemoteLogSegmentMetadataTransform implements 
RemoteLogMetadataTrans
                 new RemoteLogSegmentMetadata(remoteLogSegmentId, 
record.startOffset(), record.endOffset(),
                                              record.maxTimestampMs(), 
record.brokerId(),
                                              record.eventTimestampMs(), 
record.segmentSizeInBytes(),
-                                             segmentLeaderEpochs);
+                                             segmentLeaderEpochs, 
record.txnIndexEmpty());
         RemoteLogSegmentMetadataUpdate rlsmUpdate
                 = new RemoteLogSegmentMetadataUpdate(remoteLogSegmentId, 
record.eventTimestampMs(),
                                                      customMetadata,
diff --git 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
index 10d1449cdb7..8e089dc3cfc 100644
--- 
a/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java
@@ -194,6 +194,14 @@ public class TransactionIndex implements Closeable {
         }
     }
 
+    /**
+     * Check if the index is empty.
+     * @return `true` if the index is empty (or) when underlying file doesn't 
exists, `false` otherwise.
+     */
+    public boolean isEmpty() {
+        return !iterable().iterator().hasNext();
+    }
+
     private FileChannel openChannel() throws IOException {
         FileChannel channel = FileChannel.open(file.toPath(), 
StandardOpenOption.CREATE,
                 StandardOpenOption.READ, StandardOpenOption.WRITE);
diff --git 
a/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json 
b/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
index c737135a6a2..9c035f52630 100644
--- a/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
+++ b/storage/src/main/resources/message/RemoteLogSegmentMetadataRecord.json
@@ -129,6 +129,14 @@
       "type": "int8",
       "versions": "0+",
       "about": "State identifier of the remote log segment, which is 
RemoteLogSegmentState.id()."
+    },
+    {
+      "name": "TxnIndexEmpty",
+      "type": "bool",
+      "versions": "0+",
+      "about": "Flag to indicate if the transaction index is empty.",
+      "taggedVersions": "0+",
+      "tag": 0
     }
   ]
 }
\ No newline at end of file
diff --git 
a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
 
b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
index 20fb1732572..f4a1f19dca4 100644
--- 
a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
+++ 
b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
@@ -95,6 +95,14 @@
       "type": "int8",
       "versions": "0+",
       "about": "State of the remote log segment"
+    },
+    {
+      "name": "TxnIndexEmpty",
+      "type": "bool",
+      "versions": "0+",
+      "about": "Flag to indicate if the transaction index is empty.",
+      "taggedVersions": "0+",
+      "tag": 0
     }
   ]
 }
\ No newline at end of file
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
index d6a03441e8b..47925d01a7d 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataFormatterTest.java
@@ -53,7 +53,7 @@ public class RemoteLogMetadataFormatterTest {
         Optional<CustomMetadata> customMetadata = Optional.of(new 
CustomMetadata(new byte[10]));
         RemoteLogSegmentMetadata remoteLogMetadata = new 
RemoteLogSegmentMetadata(
                 remoteLogSegmentId, 0L, 100L, -1L, 1, 123L, 1024, 
customMetadata, COPY_SEGMENT_STARTED,
-                segLeaderEpochs);
+                segLeaderEpochs, true);
 
         byte[] metadataBytes = new 
RemoteLogMetadataSerde().serialize(remoteLogMetadata);
         ConsumerRecord<byte[], byte[]> metadataRecord = new ConsumerRecord<>(
@@ -65,7 +65,7 @@ public class RemoteLogMetadataFormatterTest {
                         "startOffset=0, endOffset=100, brokerId=1, 
maxTimestampMs=-1, " +
                         "eventTimestampMs=123, segmentLeaderEpochs={0=0, 1=20, 
2=80}, segmentSizeInBytes=1024, " +
                         "customMetadata=Optional[CustomMetadata{10 bytes}], " +
-                        "state=COPY_SEGMENT_STARTED}\n",
+                        "state=COPY_SEGMENT_STARTED, txnIdxEmpty=true}\n",
                 TOPIC_ID, SEGMENT_ID);
         try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
              PrintStream ps = new PrintStream(baos)) {
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
index b4c4110ecea..399529129ff 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransformTest.java
@@ -35,7 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 class RemoteLogSegmentMetadataSnapshotTransformTest {
     @ParameterizedTest
     @MethodSource("parameters")
-    void testToAndFromMessage(Optional<CustomMetadata> customMetadata) {
+    void testToAndFromMessage(Optional<CustomMetadata> customMetadata, boolean 
isTxnIdxEmpty) {
         Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
         segmentLeaderEpochs.put(0, 0L);
         RemoteLogSegmentMetadataSnapshot snapshot = new 
RemoteLogSegmentMetadataSnapshot(
@@ -43,7 +43,8 @@ class RemoteLogSegmentMetadataSnapshotTransformTest {
                 0L, 100L, -1L, 0, 0, 1234,
                 customMetadata,
                 RemoteLogSegmentState.COPY_SEGMENT_FINISHED,
-                segmentLeaderEpochs
+                segmentLeaderEpochs,
+                isTxnIdxEmpty
         );
 
         RemoteLogSegmentMetadataSnapshotTransform transform = new 
RemoteLogSegmentMetadataSnapshotTransform();
@@ -51,11 +52,11 @@ class RemoteLogSegmentMetadataSnapshotTransformTest {
         assertEquals(snapshot, transform.fromApiMessageAndVersion(message));
     }
 
-    private static Stream<Object> parameters() {
+    private static Stream<Object[]> parameters() {
         return Stream.of(
-                Optional.of(new CustomMetadata(new byte[]{0, 1, 2, 3})),
-                Optional.of(new CustomMetadata(new byte[0])),
-                Optional.empty()
+                new Object[]{Optional.of(new CustomMetadata(new byte[]{0, 1, 
2, 3})), true},
+                new Object[]{Optional.of(new CustomMetadata(new byte[0])), 
false},
+                new Object[]{Optional.empty(), true}
         );
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java
index 7a75ab0cced..1d19d433e05 100644
--- 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/TransactionIndexTest.java
@@ -202,4 +202,24 @@ public class TransactionIndexTest {
         index.deleteIfExists();
         assertFalse(file.exists());
     }
+
+    @Test
+    public void testIsEmptyWhenFileDoesNotExist() throws IOException {
+        File nonExistentFile = TestUtils.tempFile();
+        assertTrue(nonExistentFile.delete());
+        try (TransactionIndex testIndex = new TransactionIndex(0, 
nonExistentFile)) {
+            assertTrue(testIndex.isEmpty());
+        }
+    }
+
+    @Test
+    public void testIsEmptyWhenFileIsEmpty() {
+        assertTrue(index.isEmpty());
+    }
+
+    @Test
+    public void testIsEmptyWhenFileIsNotEmpty() throws IOException {
+        index.append(new AbortedTxn(0L, 0, 10, 2));
+        assertFalse(index.isEmpty());
+    }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
index a93dea813d1..c51405c8636 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java
@@ -53,6 +53,7 @@ import org.apache.kafka.tiered.storage.specs.OffloadableSpec;
 import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec;
 import org.apache.kafka.tiered.storage.specs.ProducableSpec;
 import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
 import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
 import org.apache.kafka.tiered.storage.specs.TopicSpec;
 
@@ -228,10 +229,17 @@ public final class TieredStorageTestBuilder {
     public TieredStorageTestBuilder expectFetchFromTieredStorage(Integer 
fromBroker,
                                                                  String topic,
                                                                  Integer 
partition,
-                                                                 Integer 
remoteFetchRequestCount) {
+                                                                 Integer 
segmentFetchRequestCount) {
+        return expectFetchFromTieredStorage(fromBroker, topic, partition, new 
RemoteFetchCount(segmentFetchRequestCount));
+    }
+
+    public TieredStorageTestBuilder expectFetchFromTieredStorage(Integer 
fromBroker,
+                                                                 String topic,
+                                                                 Integer 
partition,
+                                                                 
RemoteFetchCount remoteFetchRequestCount) {
         TopicPartition topicPartition = new TopicPartition(topic, partition);
         assertTrue(partition >= 0, "Partition must be >= 0");
-        assertTrue(remoteFetchRequestCount >= 0, "Expected fetch count from 
tiered storage must be >= 0");
+        
assertTrue(remoteFetchRequestCount.getSegmentFetchCountAndOp().getCount() >= 0, 
"Expected fetch count from tiered storage must be >= 0");
         assertFalse(fetchables.containsKey(topicPartition), "Consume already 
in progress for " + topicPartition);
         fetchables.put(topicPartition, new FetchableSpec(fromBroker, 
remoteFetchRequestCount));
         return this;
@@ -371,7 +379,7 @@ public final class TieredStorageTestBuilder {
     private void createConsumeAction() {
         if (!consumables.isEmpty()) {
             consumables.forEach((topicPartition, consumableSpec) -> {
-                FetchableSpec fetchableSpec = 
fetchables.computeIfAbsent(topicPartition, k -> new FetchableSpec(0, 0));
+                FetchableSpec fetchableSpec = 
fetchables.computeIfAbsent(topicPartition, k -> new FetchableSpec(0, new 
RemoteFetchCount(0)));
                 RemoteFetchSpec remoteFetchSpec = new 
RemoteFetchSpec(fetchableSpec.getSourceBrokerId(), topicPartition,
                         fetchableSpec.getFetchCount());
                 ConsumeAction action = new ConsumeAction(topicPartition, 
consumableSpec.getFetchOffset(),
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
index 97feae05654..4b4c401b1d2 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
@@ -92,12 +92,16 @@ public abstract class TieredStorageTestHarness extends 
IntegrationTestHarness {
 
     protected abstract void writeTestSpecifications(TieredStorageTestBuilder 
builder);
 
+    protected void overrideConsumerConfig(Properties consumerConfig) {
+    }
+
     @BeforeEach
     @Override
     public void setUp(TestInfo testInfo) {
         testClassName = 
testInfo.getTestClass().get().getSimpleName().toLowerCase(Locale.getDefault());
         storageDirPath = TestUtils.tempDirectory("kafka-remote-tier-" + 
testClassName).getAbsolutePath();
         super.setUp(testInfo);
+        overrideConsumerConfig(consumerConfig());
         context = new TieredStorageTestContext(this);
     }
 
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
index 250db6a46e8..9549a6b6916 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
@@ -25,20 +25,26 @@ import 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
 import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
 import org.apache.kafka.tiered.storage.TieredStorageTestAction;
 import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
 import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
 
 import java.io.PrintStream;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_OFFSET_INDEX;
 import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TIME_INDEX;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_TRANSACTION_INDEX;
 import static 
org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher.correspondTo;
 import static 
org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.tieredStorageRecords;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
 public final class ConsumeAction implements TieredStorageTestAction {
@@ -75,6 +81,9 @@ public final class ConsumeAction implements 
TieredStorageTestAction {
         // type has yet to happen.
         LocalTieredStorageHistory history = 
context.tieredStorageHistory(remoteFetchSpec.getSourceBrokerId());
         Optional<LocalTieredStorageEvent> latestEventSoFar = 
history.latestEvent(FETCH_SEGMENT, topicPartition);
+        Optional<LocalTieredStorageEvent> latestOffsetIdxEventSoFar = 
history.latestEvent(FETCH_OFFSET_INDEX, topicPartition);
+        Optional<LocalTieredStorageEvent> latestTimeIdxEventSoFar = 
history.latestEvent(FETCH_TIME_INDEX, topicPartition);
+        Optional<LocalTieredStorageEvent> latestTxnIdxEventSoFar = 
history.latestEvent(FETCH_TRANSACTION_INDEX, topicPartition);
 
         // Records are consumed here
         List<ConsumerRecord<String, String>> consumedRecords =
@@ -119,16 +128,61 @@ public final class ConsumeAction implements 
TieredStorageTestAction {
         assertThat(storedRecords, correspondTo(readRecords, topicPartition, 
serde, serde));
 
         // (B) Assessment of the interactions between the source broker and 
the second-tier storage.
-        List<LocalTieredStorageEvent> events = 
history.getEvents(FETCH_SEGMENT, topicPartition);
-        List<LocalTieredStorageEvent> eventsInScope = latestEventSoFar
-                .map(latestEvent ->
-                        events.stream().filter(event -> 
event.isAfter(latestEvent)).collect(Collectors.toList()))
-                .orElse(events);
-
-        assertEquals(remoteFetchSpec.getCount(), eventsInScope.size(),
-                "Number of fetch requests from broker " + 
remoteFetchSpec.getSourceBrokerId() + " to the " +
-                        "tier storage does not match the expected value for 
topic-partition "
-                        + remoteFetchSpec.getTopicPartition());
+        for (LocalTieredStorageEvent.EventType eventType : 
Arrays.asList(FETCH_SEGMENT, FETCH_OFFSET_INDEX, FETCH_TIME_INDEX, 
FETCH_TRANSACTION_INDEX)) {
+            Optional<LocalTieredStorageEvent> latestEvent;
+            switch (eventType) {
+                case FETCH_SEGMENT:
+                    latestEvent = latestEventSoFar;
+                    break;
+                case FETCH_OFFSET_INDEX:
+                    latestEvent = latestOffsetIdxEventSoFar;
+                    break;
+                case FETCH_TIME_INDEX:
+                    latestEvent = latestTimeIdxEventSoFar;
+                    break;
+                case FETCH_TRANSACTION_INDEX:
+                    latestEvent = latestTxnIdxEventSoFar;
+                    break;
+                default:
+                    latestEvent = Optional.empty();
+            }
+
+            List<LocalTieredStorageEvent> events = 
history.getEvents(eventType, topicPartition);
+            List<LocalTieredStorageEvent> eventsInScope = latestEvent
+                    .map(e -> events.stream().filter(event -> 
event.isAfter(e)).collect(Collectors.toList()))
+                    .orElse(events);
+
+            RemoteFetchCount remoteFetchCount = 
remoteFetchSpec.getRemoteFetchCount();
+            RemoteFetchCount.FetchCountAndOp expectedCountAndOp;
+            switch (eventType) {
+                case FETCH_SEGMENT:
+                    expectedCountAndOp = 
remoteFetchCount.getSegmentFetchCountAndOp();
+                    break;
+                case FETCH_OFFSET_INDEX:
+                    expectedCountAndOp = 
remoteFetchCount.getOffsetIdxFetchCountAndOp();
+                    break;
+                case FETCH_TIME_INDEX:
+                    expectedCountAndOp = 
remoteFetchCount.getTimeIdxFetchCountAndOp();
+                    break;
+                case FETCH_TRANSACTION_INDEX:
+                    expectedCountAndOp = 
remoteFetchCount.getTxnIdxFetchCountAndOp();
+                    break;
+                default:
+                    expectedCountAndOp = new 
RemoteFetchCount.FetchCountAndOp(-1, RemoteFetchCount.OperationType.EQUALS_TO);
+            }
+
+            String message = String.format("Number of %s requests from broker 
%d to the tier storage does not match the expected value for topic-partition 
%s",
+                    eventType, remoteFetchSpec.getSourceBrokerId(), 
remoteFetchSpec.getTopicPartition());
+            if (expectedCountAndOp.getCount() != -1) {
+                if (expectedCountAndOp.getOperationType() == 
RemoteFetchCount.OperationType.EQUALS_TO) {
+                    assertEquals(expectedCountAndOp.getCount(), 
eventsInScope.size(), message);
+                } else if (expectedCountAndOp.getOperationType() == 
RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO) {
+                    assertTrue(eventsInScope.size() <= 
expectedCountAndOp.getCount(), message);
+                } else {
+                    assertTrue(eventsInScope.size() >= 
expectedCountAndOp.getCount(), message);
+                }
+            }
+        }
     }
 
     @Override
@@ -138,5 +192,6 @@ public final class ConsumeAction implements 
TieredStorageTestAction {
         output.println("  fetch-offset = " + fetchOffset);
         output.println("  expected-record-count = " + expectedTotalCount);
         output.println("  expected-record-from-tiered-storage = " + 
expectedFromSecondTierCount);
+        output.println("  remote-fetch-spec = " + remoteFetchSpec);
     }
 }
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
new file mode 100644
index 00000000000..38b7ae3df2d
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndTxnConsumeFromLeaderTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteFetchCount;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.kafka.tiered.storage.specs.RemoteFetchCount.FetchCountAndOp;
+import static 
org.apache.kafka.tiered.storage.specs.RemoteFetchCount.OperationType.EQUALS_TO;
+import static 
org.apache.kafka.tiered.storage.specs.RemoteFetchCount.OperationType.LESS_THAN_OR_EQUALS_TO;
+
+/**
+ * Test Cases:
+ *    Elementary offloads and fetches from tiered storage using consumer with 
read_committed isolation level.
+ */
+public final class OffloadAndTxnConsumeFromLeaderTest extends 
TieredStorageTestHarness {
+
+    /**
+     * Cluster of one broker
+     * @return number of brokers in the cluster
+     */
+    @Override
+    public int brokerCount() {
+        return 1;
+    }
+
+    @Override
+    public Properties overridingProps() {
+        Properties props = super.overridingProps();
+        // Configure the remote-log index cache size to hold one entry to 
simulate eviction of cached index entries.
+        
props.put(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
 "1");
+        return props;
+    }
+
+    @Override
+    protected void overrideConsumerConfig(Properties consumerConfig) {
+        consumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString());
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final Integer broker = 0;
+        final String topicA = "topicA";
+        final Integer p0 = 0;
+        final Integer partitionCount = 1;
+        final Integer replicationFactor = 1;
+        final Integer oneBatchPerSegment = 1;
+        final Map<Integer, List<Integer>> replicaAssignment = null;
+        final boolean enableRemoteLogStorage = true;
+
+        builder
+                .createTopic(topicA, partitionCount, replicationFactor, 
oneBatchPerSegment, replicaAssignment,
+                        enableRemoteLogStorage)
+                .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
+                .expectSegmentToBeOffloaded(broker, topicA, p0, 2, new 
KeyValueSpec("k2", "v2"))
+                .expectSegmentToBeOffloaded(broker, topicA, p0, 3, new 
KeyValueSpec("k3", "v3"))
+                .expectSegmentToBeOffloaded(broker, topicA, p0, 4, new 
KeyValueSpec("k4", "v4"))
+                .expectSegmentToBeOffloaded(broker, topicA, p0, 5, new 
KeyValueSpec("k5", "v5"))
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 6L)
+                .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3"), new KeyValueSpec("k4", "v4"),
+                        new KeyValueSpec("k5", "v5"), new KeyValueSpec("k6", 
"v6"))
+                // When reading with transactional consumer, the consecutive 
remote fetch indexes are fetched until the
+                // LSO found is higher than the fetch-offset.
+                // summation(n) = (n * (n + 1)) / 2
+                // Total number of uploaded remote segments = 6. Total number 
of index fetches = (6 * (6 + 1)) / 2 = 21
+                // Note that we skip the index fetch when the txn-index is 
empty, so the effective index fetch count
+                // should be same as the segment count.
+                .expectFetchFromTieredStorage(broker, topicA, p0, 
getRemoteFetchCount())
+                .consume(topicA, p0, 0L, 7, 6);
+    }
+
+    private static RemoteFetchCount getRemoteFetchCount() {
+        FetchCountAndOp segmentFetchCountAndOp = new FetchCountAndOp(6, 
EQUALS_TO);
+        // RemoteIndexCache might evict the entries much before reaching the 
maximum size.
+        // To make the test deterministic, we are using the operation type as 
LESS_THAN_OR_EQUALS_TO which equals to the
+        // number of times the RemoteIndexCache gets accessed. The 
RemoteIndexCache gets accessed twice for each read.
+        FetchCountAndOp indexFetchCountAndOp = new FetchCountAndOp(12, 
LESS_THAN_OR_EQUALS_TO);
+        return new RemoteFetchCount(segmentFetchCountAndOp, 
indexFetchCountAndOp,
+                indexFetchCountAndOp, indexFetchCountAndOp);
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchableSpec.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchableSpec.java
index 48dd34052b7..9e2f23af7a7 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchableSpec.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/FetchableSpec.java
@@ -21,10 +21,10 @@ import java.util.Objects;
 public final class FetchableSpec {
 
     private final Integer sourceBrokerId;
-    private final Integer fetchCount;
+    private final RemoteFetchCount fetchCount;
 
     public FetchableSpec(Integer sourceBrokerId,
-                         Integer fetchCount) {
+                         RemoteFetchCount fetchCount) {
         this.sourceBrokerId = sourceBrokerId;
         this.fetchCount = fetchCount;
     }
@@ -33,7 +33,7 @@ public final class FetchableSpec {
         return sourceBrokerId;
     }
 
-    public Integer getFetchCount() {
+    public RemoteFetchCount getFetchCount() {
         return fetchCount;
     }
 
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java
new file mode 100644
index 00000000000..8dfc9a762b8
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchCount.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.specs;
+
+import java.util.Objects;
+
+public class RemoteFetchCount {
+    private FetchCountAndOp segmentFetchCountAndOp;
+    private FetchCountAndOp offsetIdxFetchCountAndOp = new FetchCountAndOp(-1);
+    private FetchCountAndOp timeIdxFetchCountAndOp = new FetchCountAndOp(-1);
+    private FetchCountAndOp txnIdxFetchCountAndOp = new FetchCountAndOp(-1);
+
+    public RemoteFetchCount(int segmentFetchCountAndOp) {
+        this.segmentFetchCountAndOp = new 
FetchCountAndOp(segmentFetchCountAndOp);
+    }
+
+    public RemoteFetchCount(int segmentFetchCountAndOp,
+                            int offsetIdxFetchCountAndOp,
+                            int timeIdxFetchCountAndOp,
+                            int txnIdxFetchCountAndOp) {
+        this.segmentFetchCountAndOp = new 
FetchCountAndOp(segmentFetchCountAndOp);
+        this.offsetIdxFetchCountAndOp = new 
FetchCountAndOp(offsetIdxFetchCountAndOp);
+        this.timeIdxFetchCountAndOp = new 
FetchCountAndOp(timeIdxFetchCountAndOp);
+        this.txnIdxFetchCountAndOp = new 
FetchCountAndOp(txnIdxFetchCountAndOp);
+    }
+
+    public RemoteFetchCount(FetchCountAndOp segmentFetchCountAndOp,
+                            FetchCountAndOp offsetIdxFetchCountAndOp,
+                            FetchCountAndOp timeIdxFetchCountAndOp,
+                            FetchCountAndOp txnIdxFetchCountAndOp) {
+        this.segmentFetchCountAndOp = segmentFetchCountAndOp;
+        this.offsetIdxFetchCountAndOp = offsetIdxFetchCountAndOp;
+        this.timeIdxFetchCountAndOp = timeIdxFetchCountAndOp;
+        this.txnIdxFetchCountAndOp = txnIdxFetchCountAndOp;
+    }
+
+    public FetchCountAndOp getSegmentFetchCountAndOp() {
+        return segmentFetchCountAndOp;
+    }
+
+    public void setSegmentFetchCountAndOp(FetchCountAndOp 
segmentFetchCountAndOp) {
+        this.segmentFetchCountAndOp = segmentFetchCountAndOp;
+    }
+
+    public FetchCountAndOp getOffsetIdxFetchCountAndOp() {
+        return offsetIdxFetchCountAndOp;
+    }
+
+    public void setOffsetIdxFetchCountAndOp(FetchCountAndOp 
offsetIdxFetchCountAndOp) {
+        this.offsetIdxFetchCountAndOp = offsetIdxFetchCountAndOp;
+    }
+
+    public FetchCountAndOp getTimeIdxFetchCountAndOp() {
+        return timeIdxFetchCountAndOp;
+    }
+
+    public void setTimeIdxFetchCountAndOp(FetchCountAndOp 
timeIdxFetchCountAndOp) {
+        this.timeIdxFetchCountAndOp = timeIdxFetchCountAndOp;
+    }
+
+    public FetchCountAndOp getTxnIdxFetchCountAndOp() {
+        return txnIdxFetchCountAndOp;
+    }
+
+    public void setTxnIdxFetchCountAndOp(FetchCountAndOp 
txnIdxFetchCountAndOp) {
+        this.txnIdxFetchCountAndOp = txnIdxFetchCountAndOp;
+    }
+
+    @Override
+    public String toString() {
+        return "RemoteFetchCount{" +
+                "segmentFetchCountAndOp=" + segmentFetchCountAndOp +
+                ", offsetIdxFetchCountAndOp=" + offsetIdxFetchCountAndOp +
+                ", timeIdxFetchCountAndOp=" + timeIdxFetchCountAndOp +
+                ", txnIdxFetchCountAndOp=" + txnIdxFetchCountAndOp +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        RemoteFetchCount that = (RemoteFetchCount) o;
+        return Objects.equals(segmentFetchCountAndOp, 
that.segmentFetchCountAndOp) &&
+                Objects.equals(offsetIdxFetchCountAndOp, 
that.offsetIdxFetchCountAndOp) &&
+                Objects.equals(timeIdxFetchCountAndOp, 
that.timeIdxFetchCountAndOp) &&
+                Objects.equals(txnIdxFetchCountAndOp, 
that.txnIdxFetchCountAndOp);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(segmentFetchCountAndOp, offsetIdxFetchCountAndOp, 
timeIdxFetchCountAndOp, txnIdxFetchCountAndOp);
+    }
+
+    public enum OperationType {
+        EQUALS_TO,
+        GREATER_THAN_OR_EQUALS_TO,
+        LESS_THAN_OR_EQUALS_TO
+    }
+
+    public static class FetchCountAndOp {
+        private final int count;
+        private final OperationType operationType;
+
+        public FetchCountAndOp(int count) {
+            this.count = count;
+            this.operationType = OperationType.EQUALS_TO;
+        }
+
+        public FetchCountAndOp(int count, OperationType operationType) {
+            this.count = count;
+            this.operationType = operationType;
+        }
+
+        public int getCount() {
+            return count;
+        }
+
+        public OperationType getOperationType() {
+            return operationType;
+        }
+
+        @Override
+        public String toString() {
+            return "FetchCountAndOp{" +
+                    "count=" + count +
+                    ", operationType=" + operationType +
+                    '}';
+        }
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchSpec.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchSpec.java
index 5a17ebee0cd..823d4321bc7 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchSpec.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/specs/RemoteFetchSpec.java
@@ -24,7 +24,7 @@ public final class RemoteFetchSpec {
 
     private final int sourceBrokerId;
     private final TopicPartition topicPartition;
-    private final int count;
+    private final RemoteFetchCount remoteFetchCount;
 
     /**
      * Specifies a fetch (download) event from a second-tier storage. This is 
used to ensure the
@@ -32,14 +32,14 @@ public final class RemoteFetchSpec {
      *
      * @param sourceBrokerId The broker which fetched (a) remote log 
segment(s) from the second-tier storage.
      * @param topicPartition The topic-partition which segment(s) were fetched.
-     * @param count The number of remote log segment(s) fetched.
+     * @param remoteFetchCount The number of remote log segment(s) and indexes 
fetched.
      */
     public RemoteFetchSpec(int sourceBrokerId,
                            TopicPartition topicPartition,
-                           int count) {
+                           RemoteFetchCount remoteFetchCount) {
         this.sourceBrokerId = sourceBrokerId;
         this.topicPartition = topicPartition;
-        this.count = count;
+        this.remoteFetchCount = remoteFetchCount;
     }
 
     public int getSourceBrokerId() {
@@ -50,14 +50,14 @@ public final class RemoteFetchSpec {
         return topicPartition;
     }
 
-    public int getCount() {
-        return count;
+    public RemoteFetchCount getRemoteFetchCount() {
+        return remoteFetchCount;
     }
 
     @Override
     public String toString() {
-        return String.format("RemoteFetch[source-broker-id=%d 
topic-partition=%s count=%d]",
-                sourceBrokerId, topicPartition, count);
+        return String.format("RemoteFetch[source-broker-id=%d 
topic-partition=%s remote-fetch-count=%s]",
+                sourceBrokerId, topicPartition, remoteFetchCount);
     }
 
     @Override
@@ -66,12 +66,12 @@ public final class RemoteFetchSpec {
         if (o == null || getClass() != o.getClass()) return false;
         RemoteFetchSpec that = (RemoteFetchSpec) o;
         return sourceBrokerId == that.sourceBrokerId
-                && count == that.count
+                && Objects.equals(remoteFetchCount, that.remoteFetchCount)
                 && Objects.equals(topicPartition, that.topicPartition);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(sourceBrokerId, topicPartition, count);
+        return Objects.hash(sourceBrokerId, topicPartition, remoteFetchCount);
     }
 }


Reply via email to