satishd commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1590567527


##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1858,16 +1858,6 @@ class KafkaConfigTest {
     }
   }
 
-  @Test
-  def testMultipleLogDirectoriesNotSupportedWithRemoteLogStorage(): Unit = {

Review Comment:
   Can we add a positive test for mulitple log dirs also like the one created 
below for a single dir, may be refactor the below test to cover for both 
scenarios?



##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -40,19 +90,176 @@ public interface TierStateMachine {
      */
     PartitionFetchState start(TopicPartition topicPartition,
                               PartitionFetchState currentFetchState,
-                              PartitionData fetchPartitionData) throws 
Exception;
+                              PartitionData fetchPartitionData) throws 
Exception {
+        OffsetAndEpoch epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+        int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
+        long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
+
+        long offsetToFetch;
+        
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+        
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+
+        UnifiedLog unifiedLog;
+        if (useFutureLog) {
+            unifiedLog = replicaMgr.futureLogOrException(topicPartition);
+        } else {
+            unifiedLog = replicaMgr.localLogOrException(topicPartition);
+        }
+
+        try {
+            offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset(), unifiedLog);
+        } catch (RemoteStorageException e) {
+            
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
+            
replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
+            throw e;
+        }
+
+        OffsetAndEpoch fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+        long leaderEndOffset = fetchLatestOffsetResult.offset();
+
+        long initialLag = leaderEndOffset - offsetToFetch;
+
+        return PartitionFetchState.apply(currentFetchState.topicId(), 
offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+                Fetching$.MODULE$, unifiedLog.latestEpoch());
+
+    }
+
+    private OffsetForLeaderEpochResponseData.EpochEndOffset 
fetchEarlierEpochEndOffset(Integer epoch,
+                                                                               
        TopicPartition partition,
+                                                                               
        Integer currentLeaderEpoch) {
+        int previousEpoch = epoch - 1;
+
+        // Find the end-offset for the epoch earlier to the given epoch from 
the leader
+        Map<TopicPartition, 
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> partitionsWithEpochs 
= new HashMap<>();
+        partitionsWithEpochs.put(partition, new 
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+        Option<OffsetForLeaderEpochResponseData.EpochEndOffset> 
maybeEpochEndOffset = 
leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition);
+        if (maybeEpochEndOffset.isEmpty()) {
+            throw new KafkaException("No response received for partition: " + 
partition);
+        }
+
+        OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = 
maybeEpochEndOffset.get();
+        if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+            throw Errors.forCode(epochEndOffset.errorCode()).exception();
+        }
+
+        return epochEndOffset;
+    }
+
+    private List<EpochEntry> readLeaderEpochCheckpoint(RemoteLogManager rlm,
+                                                       
RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, 
RemoteStorageException {
+        InputStream inputStream = 
rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, 
RemoteStorageManager.IndexType.LEADER_EPOCH);
+        try (BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+            CheckpointFile.CheckpointReadBuffer<EpochEntry> readBuffer = new 
CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, 
LeaderEpochCheckpointFile.FORMATTER);
+            return readBuffer.read();
+        }
+    }
+
+    private void buildProducerSnapshotFile(UnifiedLog unifiedLog,
+                                           long nextOffset,
+                                           RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+                                           RemoteLogManager rlm) throws 
IOException, RemoteStorageException {
+        // Restore producer snapshot
+        File snapshotFile = 
LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
+        Path tmpSnapshotFile = Paths.get(snapshotFile.getAbsolutePath() + 
".tmp");
+        // Copy it to snapshot file in atomic manner.
+        Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, 
RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+                tmpSnapshotFile, StandardCopyOption.REPLACE_EXISTING);
+        Utils.atomicMoveWithFallback(tmpSnapshotFile, snapshotFile.toPath(), 
false);
+
+        // Reload producer snapshots.
+        unifiedLog.producerStateManager().truncateFullyAndReloadSnapshots();
+        unifiedLog.loadProducerState(nextOffset);
+    }
 
     /**
-     * Optionally advance the state of the tier state machine, based on the
-     * current PartitionFetchState. The decision to advance the tier
-     * state machine is implementation specific.
-     *
-     * @param topicPartition the topic partition
-     * @param currentFetchState the current PartitionFetchState which will
-     *                          be used to derive the return value
-     *
-     * @return the new PartitionFetchState if the tier state machine was 
advanced, otherwise, return the currentFetchState
+     * It tries to build the required state for this partition from leader and 
remote storage so that it can start
+     * fetching records from the leader. The return value is the next offset 
to fetch from the leader, which is the
+     * next offset following the end offset of the remote log portion.
      */
-    Optional<PartitionFetchState> maybeAdvanceState(TopicPartition 
topicPartition,

Review Comment:
   We can remove this function for now. We will introduce it when 
https://issues.apache.org/jira/browse/KAFKA-13560 is addressed. I will leave a 
comment once this PR is merged. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to