clolov commented on code in PR #15213: URL: https://github.com/apache/kafka/pull/15213#discussion_r1455525955
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1300,18 +1303,27 @@ class UnifiedLog(@volatile var logStartOffset: Long, } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) { val curLocalLogStartOffset = localLogStartOffset() - val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache => { + val epochOpt: Optional[Integer] = leaderEpochCache.asJava.flatMap(cache => { val epoch = cache.epochForOffset(curLocalLogStartOffset) - if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else Optional.empty[EpochEntry]() + if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else Optional.empty[Integer]() }) - val epochOpt = if (earliestLocalLogEpochEntry.isPresent && earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset) - Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch) - else Optional.empty[Integer]() - Review Comment: I didn't really see a point in this check `earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset`. As far as I can tell the `cache.epochForOffset` already carries it out. Let me know in case I have misunderstood something. ########## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ########## @@ -2126,6 +2126,94 @@ class UnifiedLogTest { log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, Some(remoteLogManager))) } + @Test + def testFetchLatestTieredTimestampNoRemoteStorage(): Unit = { + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1) + val log = createLog(logDir, logConfig) + + assertEquals(None, log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) + + val firstTimestamp = mockTime.milliseconds + val leaderEpoch = 0 + log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = firstTimestamp), + leaderEpoch = leaderEpoch) + + val secondTimestamp = firstTimestamp + 1 + log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = secondTimestamp), + leaderEpoch = leaderEpoch) + + log.appendAsLeader(TestUtils.singletonRecords( + value = TestUtils.randomBytes(10), + timestamp = firstTimestamp), + leaderEpoch = leaderEpoch) + + assertEquals(None, log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)) + } + + @Test + def testFetchLatestTieredTimestampWithRemoteStorage(): Unit = { Review Comment: This test could be combined with `testFetchOffsetByTimestampFromRemoteStorage` as the only difference it has are lines 2167, 2193, 2203 and 2204. Let me know your thoughts! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org