satishd commented on code in PR #14727: URL: https://github.com/apache/kafka/pull/14727#discussion_r1392031838
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -133,6 +134,10 @@ public class RemoteLogManager implements Closeable { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class); private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader"; + private static final Set<RemoteLogSegmentState> SEGMENT_DELETION_VALID_STATES = Collections.unmodifiableSet(EnumSet.of( Review Comment: COPY_SEGMENT_STARTED segments are eligible for deletion when those segments were not able to be copied by the leader as the leader went through ungraceful shutdown or for any oher reasons. New leader may pickup the resepctive segments for the targeted offsets that need to be copied and the earlier failed segment will remain in the COPY_SEGMENT_STARTED state and it will eventually be deleted by retention cleanup logic. So, COPY_SEGMENT_STARTED is a valid transition even now when copy and deletion are happening sequentially. ########## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ########## @@ -1027,8 +1027,32 @@ void testFindOffsetByTimestampWithInvalidEpochSegments() throws IOException, Rem assertEquals(Optional.empty(), maybeTimestampAndOffset3); } + @Test + void testFindOffsetByTimestampWithSegmentNotReady() throws IOException, RemoteStorageException { + TopicPartition tp = leaderTopicIdPartition.topicPartition(); + + long ts = time.milliseconds(); + long startOffset = 120; + int targetLeaderEpoch = 10; + + TreeMap<Integer, Long> validSegmentEpochs = new TreeMap<>(); + validSegmentEpochs.put(targetLeaderEpoch, startOffset); + + LeaderEpochFileCache leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint); + leaderEpochFileCache.assign(4, 99L); + leaderEpochFileCache.assign(5, 99L); + leaderEpochFileCache.assign(targetLeaderEpoch, startOffset); + leaderEpochFileCache.assign(12, 500L); + + doTestFindOffsetByTimestamp(ts, startOffset, targetLeaderEpoch, validSegmentEpochs, RemoteLogSegmentState.COPY_SEGMENT_STARTED); + + Optional<FileRecords.TimestampAndOffset> maybeTimestampAndOffset1 = remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset, leaderEpochFileCache); Review Comment: Can you rename `maybeTimestampAndOffset1` as `maybeTimestampAndOffset`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org