showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1587154091


##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
      * @return the new PartitionFetchState if the tier state machine was 
advanced, otherwise, return the currentFetchState
      */
     Optional<PartitionFetchState> maybeAdvanceState(TopicPartition 
topicPartition,
-                                                    PartitionFetchState 
currentFetchState);
+                                                    PartitionFetchState 
currentFetchState) {
+        // This is currently a no-op but will be used for implementing async 
tiering logic in KAFKA-13560.
+        return Optional.of(currentFetchState);
+    }
+
+    /**
+     * It tries to build the required state for this partition from leader and 
remote storage so that it can start
+     * fetching records from the leader. The return value is the next offset 
to fetch from the leader, which is the
+     * next offset following the end offset of the remote log portion.
+     */
+    private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+                                        Integer currentLeaderEpoch,
+                                        Long leaderLocalLogStartOffset,
+                                        Integer 
epochForLeaderLocalLogStartOffset,
+                                        Long leaderLogStartOffset,
+                                        UnifiedLog unifiedLog) throws 
IOException, RemoteStorageException {
+
+        long nextOffset;
+
+        if (unifiedLog.remoteStorageSystemEnable() && 
unifiedLog.config().remoteStorageEnable()) {
+            if (replicaMgr.remoteLogManager().isEmpty()) throw new 
IllegalStateException("RemoteLogManager is not yet instantiated");
+
+            RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+            // Find the respective leader epoch for (leaderLocalLogStartOffset 
- 1). We need to build the leader epoch cache
+            // until that offset
+            long previousOffsetToLeaderLocalLogStartOffset = 
leaderLocalLogStartOffset - 1;
+            int targetEpoch;
+            // If the existing epoch is 0, no need to fetch from earlier epoch 
as the desired offset(leaderLogStartOffset - 1)
+            // will have the same epoch.
+            if (epochForLeaderLocalLogStartOffset == 0) {
+                targetEpoch = epochForLeaderLocalLogStartOffset;
+            } else {
+                // Fetch the earlier epoch/end-offset(exclusive) from the 
leader.
+                OffsetForLeaderEpochResponseData.EpochEndOffset 
earlierEpochEndOffset = 
fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, 
currentLeaderEpoch);
+                // Check if the target offset lies within the range of earlier 
epoch. Here, epoch's end-offset is exclusive.
+                if (earlierEpochEndOffset.endOffset() > 
previousOffsetToLeaderLocalLogStartOffset) {
+                    // Always use the leader epoch from returned 
earlierEpochEndOffset.
+                    // This gives the respective leader epoch, that will 
handle any gaps in epochs.
+                    // For ex, leader epoch cache contains:
+                    // leader-epoch   start-offset
+                    //  0               20
+                    //  1               85
+                    //  <2> - gap no messages were appended in this leader 
epoch.
+                    //  3               90
+                    //  4               98
+                    // There is a gap in leader epoch. For 
leaderLocalLogStartOffset as 90, leader-epoch is 3.
+                    // fetchEarlierEpochEndOffset(2) will return leader-epoch 
as 1, end-offset as 90.
+                    // So, for offset 89, we should return leader epoch as 1 
like below.
+                    targetEpoch = earlierEpochEndOffset.leaderEpoch();
+                } else {
+                    targetEpoch = epochForLeaderLocalLogStartOffset;
+                }
+            }
+
+            Optional<RemoteLogSegmentMetadata> maybeRlsm = 
rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, 
previousOffsetToLeaderLocalLogStartOffset);
+
+            if (maybeRlsm.isPresent()) {
+                RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
maybeRlsm.get();

Review Comment:
   Nice refactor! Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to