junrao commented on code in PR #15621:
URL: https://github.com/apache/kafka/pull/15621#discussion_r1552264764


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java:
##########
@@ -483,12 +484,13 @@ public int recover(ProducerStateManager 
producerStateManager, Optional<LeaderEpo
                 // The max timestamp is exposed at the batch level, so no need 
to iterate the records
                 if (batch.maxTimestamp() > maxTimestampSoFar()) {
                     maxTimestampAndOffsetSoFar = new 
TimestampOffset(batch.maxTimestamp(), batch.lastOffset());
+                    System.out.println("[CHIA] recovery: " + 
maxTimestampAndOffsetSoFar);

Review Comment:
   Is this just for testing?



##########
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala:
##########
@@ -189,14 +215,52 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
   }
 
   private def verifyListOffsets(topic: String = topicName, 
expectedMaxTimestampOffset: Int = 1): Unit = {
-    val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
-    assertEquals(0, earliestOffset.offset())
+    def check(): Unit = {
+      val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), 
topic)
+      assertEquals(0, earliestOffset.offset())
 
-    val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic)
-    assertEquals(3, latestOffset.offset())
+      val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), 
topic)
+      assertEquals(3, latestOffset.offset())
+
+      val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
+      assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+    }
 
-    val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp(), topic)
-    assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset())
+    // case 0: test the offsets from leader's append path
+    check()
+
+    // case 1: test the offsets from follower's append path.
+    // we make a follower be the new leader to handle the ListOffsetRequest
+    def leader(): Int = 
adminClient.describeTopics(java.util.Collections.singletonList(topic))
+      .allTopicNames().get().get(topic).partitions().get(0).leader().id()
+
+    val previousLeader = leader()
+    val newLeader = brokers.map(_.config.brokerId).find(_ != 
previousLeader).get
+
+    // change the leader to new one
+    
adminClient.alterPartitionReassignments(java.util.Collections.singletonMap(new 
TopicPartition(topic, 0),
+      Optional.of(new 
NewPartitionReassignment(java.util.Arrays.asList(newLeader))))).all().get()
+    // wait for all reassignments get completed
+    waitForAllReassignmentsToComplete(adminClient)
+    // make sure we are able to see the new leader
+    TestUtils.waitUntilTrue(() => newLeader == leader(), s"expected leader: 
$newLeader but actual: ${leader()}")

Review Comment:
   The leader could change in the error message by calling leader() again. 
Could we save the last leader and use that in the error message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to