satishd commented on code in PR #14727:
URL: https://github.com/apache/kafka/pull/14727#discussion_r1392031838


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -133,6 +134,10 @@ public class RemoteLogManager implements Closeable {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RemoteLogManager.class);
     private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = 
"remote-log-reader";
+    private static final Set<RemoteLogSegmentState> 
SEGMENT_DELETION_VALID_STATES = Collections.unmodifiableSet(EnumSet.of(

Review Comment:
   COPY_SEGMENT_STARTED segments are eligible for deletion when those segments 
were not able to be copied by the leader as the leader went through ungraceful 
shutdown or for any oher reasons. New leader may pickup the resepctive segments 
for the targeted offsets that need to be copied and the earlier failed segment 
will remain in the COPY_SEGMENT_STARTED state and it will eventually be deleted 
by retention cleanup logic. 
   
   So, COPY_SEGMENT_STARTED is a valid transition even now when copy and 
deletion are happening sequentially. 



##########
core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java:
##########
@@ -1027,8 +1027,32 @@ void testFindOffsetByTimestampWithInvalidEpochSegments() 
throws IOException, Rem
         assertEquals(Optional.empty(), maybeTimestampAndOffset3);
     }
 
+    @Test
+    void testFindOffsetByTimestampWithSegmentNotReady() throws IOException, 
RemoteStorageException {
+        TopicPartition tp = leaderTopicIdPartition.topicPartition();
+
+        long ts = time.milliseconds();
+        long startOffset = 120;
+        int targetLeaderEpoch = 10;
+
+        TreeMap<Integer, Long> validSegmentEpochs = new TreeMap<>();
+        validSegmentEpochs.put(targetLeaderEpoch, startOffset);
+
+        LeaderEpochFileCache leaderEpochFileCache = new 
LeaderEpochFileCache(tp, checkpoint);
+        leaderEpochFileCache.assign(4, 99L);
+        leaderEpochFileCache.assign(5, 99L);
+        leaderEpochFileCache.assign(targetLeaderEpoch, startOffset);
+        leaderEpochFileCache.assign(12, 500L);
+
+        doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch, 
validSegmentEpochs, RemoteLogSegmentState.COPY_SEGMENT_STARTED);
+
+        Optional<FileRecords.TimestampAndOffset> maybeTimestampAndOffset1 = 
remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset, 
leaderEpochFileCache);

Review Comment:
   Can you rename `maybeTimestampAndOffset1` as `maybeTimestampAndOffset`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to