junrao commented on code in PR #15621: URL: https://github.com/apache/kafka/pull/15621#discussion_r1552022713
########## core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala: ########## @@ -189,14 +220,49 @@ class ListOffsetsIntegrationTest extends KafkaServerTestHarness { } private def verifyListOffsets(topic: String = topicName, expectedMaxTimestampOffset: Int = 1): Unit = { - val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) - assertEquals(0, earliestOffset.offset()) + def check(): Unit = { + val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest(), topic) + assertEquals(0, earliestOffset.offset()) - val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) - assertEquals(3, latestOffset.offset()) + val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest(), topic) + assertEquals(3, latestOffset.offset()) + + val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) + assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) + } - val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp(), topic) - assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset()) + // case 0: test the offsets from leader's append path + check() + + // case 1: test the offsets from follower's append path. + // we make a follower be the new leader to handle the ListOffsetRequest + val partitionAssignment = adminClient.describeTopics(java.util.Collections.singletonList(topic)) + .allTopicNames().get().get(topic).partitions().get(0) + val newLeader = brokers.map(_.config.brokerId).find(_ != partitionAssignment.leader().id()).get + adminClient.alterPartitionReassignments(java.util.Collections.singletonMap(new TopicPartition(topic, 0), + Optional.of(new NewPartitionReassignment(java.util.Arrays.asList(newLeader))))).all().get() + waitForAllReassignmentsToComplete(adminClient) + TestUtils.waitUntilTrue(() => newLeader == adminClient.describeTopics(java.util.Collections.singletonList(topic)) + .allTopicNames().get().get(topic).partitions().get(0).leader().id(), "expected leader: " + newLeader + + ", but actual leader: " + adminClient.describeTopics(java.util.Collections.singletonList(topic)) + .allTopicNames().get().get(topic).partitions().get(0).leader().id()) Review Comment: This is a bit hard to read now. Could we do `adminClient.describeTopics` once and reuse the result? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org