clolov commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1455525955


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1300,18 +1303,27 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
         val curLocalLogStartOffset = localLogStartOffset()
 
-        val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
+        val epochOpt: Optional[Integer] = 
leaderEpochCache.asJava.flatMap(cache => {
           val epoch = cache.epochForOffset(curLocalLogStartOffset)
-          if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
+          if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else 
Optional.empty[Integer]()
         })
 
-        val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-          Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-        else Optional.empty[Integer]()
-

Review Comment:
   I didn't really see a point in this check 
`earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset`. As 
far as I can tell the `cache.epochForOffset` already carries it out. Let me 
know in case I have misunderstood something.



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -2126,6 +2126,94 @@ class UnifiedLogTest {
       log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, 
Some(remoteLogManager)))
   }
 
+  @Test
+  def testFetchLatestTieredTimestampNoRemoteStorage(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, 
indexIntervalBytes = 1)
+    val log = createLog(logDir, logConfig)
+
+    assertEquals(None, 
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP))
+
+    val firstTimestamp = mockTime.milliseconds
+    val leaderEpoch = 0
+    log.appendAsLeader(TestUtils.singletonRecords(
+      value = TestUtils.randomBytes(10),
+      timestamp = firstTimestamp),
+      leaderEpoch = leaderEpoch)
+
+    val secondTimestamp = firstTimestamp + 1
+    log.appendAsLeader(TestUtils.singletonRecords(
+      value = TestUtils.randomBytes(10),
+      timestamp = secondTimestamp),
+      leaderEpoch = leaderEpoch)
+
+    log.appendAsLeader(TestUtils.singletonRecords(
+      value = TestUtils.randomBytes(10),
+      timestamp = firstTimestamp),
+      leaderEpoch = leaderEpoch)
+
+    assertEquals(None, 
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP))
+  }
+
+  @Test
+  def testFetchLatestTieredTimestampWithRemoteStorage(): Unit = {

Review Comment:
   This test could be combined with 
`testFetchOffsetByTimestampFromRemoteStorage` as the only difference it has are 
lines 2167, 2193, 2203 and 2204. Let me know your thoughts!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to