dajac commented on a change in pull request #11942:
URL: https://github.com/apache/kafka/pull/11942#discussion_r835364568



##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -1300,6 +1300,67 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testHasPreferredReplica(): Unit = {
+    val topicPartition = 0
+    val topicId = Uuid.randomUuid()
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+
+    // Prepare the mocked components for the test
+    val props = new Properties()
+    props.put(KafkaConfig.ReplicaSelectorClassProp, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector")
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+      leaderBrokerId, countDownLatch, expectTruncation = true, topicId = 
Some(topicId), extraProps = props)

Review comment:
       Using this method create a lot of unnecessary noise in my opinion. 
Nowadays, we tend to use `setupReplicaManagerWithMockedPurgatories` which is 
simpler. `setupReplicaManagerWithMockedPurgatories` does not support racks 
though. I suppose that we could it accept an optional Map from broker id to 
rack. Could we try using this one instead? There are many example in this file.

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -1300,6 +1300,67 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testHasPreferredReplica(): Unit = {
+    val topicPartition = 0
+    val topicId = Uuid.randomUuid()
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+
+    // Prepare the mocked components for the test
+    val props = new Properties()
+    props.put(KafkaConfig.ReplicaSelectorClassProp, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector")
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+      leaderBrokerId, countDownLatch, expectTruncation = true, topicId = 
Some(topicId), extraProps = props)
+
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+
+      val tp0 = new TopicPartition(topic, 0)
+      val tidp0 = new TopicIdPartition(topicId, tp0)
+
+      initializeLogAndTopicId(replicaManager, tp0, topicId)
+
+      // Make this replica the follower

Review comment:
       I suppose the replica is a leader, no?

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -1300,6 +1300,67 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testHasPreferredReplica(): Unit = {
+    val topicPartition = 0
+    val topicId = Uuid.randomUuid()
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+
+    // Prepare the mocked components for the test
+    val props = new Properties()
+    props.put(KafkaConfig.ReplicaSelectorClassProp, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector")
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+      leaderBrokerId, countDownLatch, expectTruncation = true, topicId = 
Some(topicId), extraProps = props)
+
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+
+      val tp0 = new TopicPartition(topic, 0)
+      val tidp0 = new TopicIdPartition(topicId, tp0)
+
+      initializeLogAndTopicId(replicaManager, tp0, topicId)
+
+      // Make this replica the follower
+      val leaderAndIsrRequest2 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(1)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => 
())
+      replicaManager.getPartitionOrException(tp0).updateFollowerFetchState(1, 
new LogOffsetMetadata(0), 0, 0, 0)
+
+      val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", 
"client-id",
+        InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, 
"default")
+
+      val consumerResult = fetchAsConsumer(replicaManager, tidp0,
+        new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), 
minBytes = 1,
+        clientMetadata = Some(metadata))

Review comment:
       Could we also explicitly set the timeout? I would also add a small 
comment explaining why we set min bytes and max wait time.

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -1300,6 +1300,67 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testHasPreferredReplica(): Unit = {
+    val topicPartition = 0
+    val topicId = Uuid.randomUuid()
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+
+    // Prepare the mocked components for the test
+    val props = new Properties()
+    props.put(KafkaConfig.ReplicaSelectorClassProp, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector")
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+      leaderBrokerId, countDownLatch, expectTruncation = true, topicId = 
Some(topicId), extraProps = props)
+
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+
+      val tp0 = new TopicPartition(topic, 0)
+      val tidp0 = new TopicIdPartition(topicId, tp0)
+
+      initializeLogAndTopicId(replicaManager, tp0, topicId)
+
+      // Make this replica the follower
+      val leaderAndIsrRequest2 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(1)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => 
())
+      replicaManager.getPartitionOrException(tp0).updateFollowerFetchState(1, 
new LogOffsetMetadata(0), 0, 0, 0)
+
+      val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", 
"client-id",
+        InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, 
"default")
+
+      val consumerResult = fetchAsConsumer(replicaManager, tidp0,
+        new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), 
minBytes = 1,
+        clientMetadata = Some(metadata))
+
+      // Fetch from follower succeeds
+      assertTrue(consumerResult.isFired)
+
+      // Returns a preferred replica (should just be the leader, which is None)

Review comment:
       I suppose that the comment is not correct, isn't it?

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -1300,6 +1300,67 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testHasPreferredReplica(): Unit = {
+    val topicPartition = 0
+    val topicId = Uuid.randomUuid()
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+
+    // Prepare the mocked components for the test
+    val props = new Properties()
+    props.put(KafkaConfig.ReplicaSelectorClassProp, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector")
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+      leaderBrokerId, countDownLatch, expectTruncation = true, topicId = 
Some(topicId), extraProps = props)
+
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+
+      val tp0 = new TopicPartition(topic, 0)
+      val tidp0 = new TopicIdPartition(topicId, tp0)
+
+      initializeLogAndTopicId(replicaManager, tp0, topicId)
+
+      // Make this replica the follower
+      val leaderAndIsrRequest2 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(1)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => 
())
+      replicaManager.getPartitionOrException(tp0).updateFollowerFetchState(1, 
new LogOffsetMetadata(0), 0, 0, 0)
+
+      val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", 
"client-id",
+        InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, 
"default")
+
+      val consumerResult = fetchAsConsumer(replicaManager, tidp0,
+        new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), 
minBytes = 1,
+        clientMetadata = Some(metadata))
+
+      // Fetch from follower succeeds

Review comment:
       I suppose that the comment is also not correct here, isn't it?

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -1300,6 +1300,67 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testHasPreferredReplica(): Unit = {
+    val topicPartition = 0
+    val topicId = Uuid.randomUuid()
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+
+    // Prepare the mocked components for the test
+    val props = new Properties()
+    props.put(KafkaConfig.ReplicaSelectorClassProp, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector")
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+      leaderBrokerId, countDownLatch, expectTruncation = true, topicId = 
Some(topicId), extraProps = props)
+
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+
+      val tp0 = new TopicPartition(topic, 0)
+      val tidp0 = new TopicIdPartition(topicId, tp0)
+
+      initializeLogAndTopicId(replicaManager, tp0, topicId)
+
+      // Make this replica the follower
+      val leaderAndIsrRequest2 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(1)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => 
())
+      replicaManager.getPartitionOrException(tp0).updateFollowerFetchState(1, 
new LogOffsetMetadata(0), 0, 0, 0)
+
+      val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", 
"client-id",

Review comment:
       nit: Is specifying `ClientMetadata` required? We usually omit the types 
in Scala unless required.

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -1300,6 +1300,67 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testHasPreferredReplica(): Unit = {
+    val topicPartition = 0
+    val topicId = Uuid.randomUuid()
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+
+    // Prepare the mocked components for the test
+    val props = new Properties()
+    props.put(KafkaConfig.ReplicaSelectorClassProp, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector")
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+      leaderBrokerId, countDownLatch, expectTruncation = true, topicId = 
Some(topicId), extraProps = props)
+
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+
+      val tp0 = new TopicPartition(topic, 0)
+      val tidp0 = new TopicIdPartition(topicId, tp0)
+
+      initializeLogAndTopicId(replicaManager, tp0, topicId)
+
+      // Make this replica the follower
+      val leaderAndIsrRequest2 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(1)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => 
())
+      replicaManager.getPartitionOrException(tp0).updateFollowerFetchState(1, 
new LogOffsetMetadata(0), 0, 0, 0)

Review comment:
       Why do we need this? Does the replica selector ignore the replica if it 
is not caught-up? It might be worth adding a small comment.

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -1300,6 +1300,67 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testHasPreferredReplica(): Unit = {
+    val topicPartition = 0
+    val topicId = Uuid.randomUuid()
+    val followerBrokerId = 0
+    val leaderBrokerId = 1
+    val leaderEpoch = 1
+    val leaderEpochIncrement = 2
+    val countDownLatch = new CountDownLatch(1)
+
+    // Prepare the mocked components for the test
+    val props = new Properties()
+    props.put(KafkaConfig.ReplicaSelectorClassProp, 
"org.apache.kafka.common.replica.RackAwareReplicaSelector")
+    val (replicaManager, _) = prepareReplicaManagerAndLogManager(new 
MockTimer(time),
+      topicPartition, leaderEpoch + leaderEpochIncrement, followerBrokerId,
+      leaderBrokerId, countDownLatch, expectTruncation = true, topicId = 
Some(topicId), extraProps = props)
+
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+
+      val tp0 = new TopicPartition(topic, 0)
+      val tidp0 = new TopicIdPartition(topicId, tp0)
+
+      initializeLogAndTopicId(replicaManager, tp0, topicId)
+
+      // Make this replica the follower
+      val leaderAndIsrRequest2 = new 
LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 
brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(1)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(false)).asJava,
+        Collections.singletonMap(topic, topicId),
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+      replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => 
())
+      replicaManager.getPartitionOrException(tp0).updateFollowerFetchState(1, 
new LogOffsetMetadata(0), 0, 0, 0)
+
+      val metadata: ClientMetadata = new DefaultClientMetadata("rack-a", 
"client-id",
+        InetAddress.getByName("localhost"), KafkaPrincipal.ANONYMOUS, 
"default")
+
+      val consumerResult = fetchAsConsumer(replicaManager, tidp0,
+        new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, Optional.empty()), 
minBytes = 1,
+        clientMetadata = Some(metadata))
+
+      // Fetch from follower succeeds
+      assertTrue(consumerResult.isFired)
+
+      // Returns a preferred replica (should just be the leader, which is None)
+      assertTrue(consumerResult.assertFired.preferredReadReplica.isDefined)

Review comment:
       Could we also verify that the fetch purgatory was not touched? I want to 
verify that no delayed fetch was inserted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to