dajac commented on a change in pull request #11965: URL: https://github.com/apache/kafka/pull/11965#discussion_r839254388
########## File path: core/src/test/java/kafka/test/MockReplicaSelector.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.replica.ClientMetadata; +import org.apache.kafka.common.replica.PartitionView; +import org.apache.kafka.common.replica.ReplicaSelector; +import org.apache.kafka.common.replica.ReplicaView; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +public class MockReplicaSelector implements ReplicaSelector { Review comment: I am not a fan of putting it in this package. Out of curiosity, have you tried to define it in the test itself? I guess that should work. ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -1300,6 +1301,59 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + @Test + def testFetchFromFollowerShouldNotRunPreferLeaderSelect(): Unit = { + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, "kafka.test.MockReplicaSelector")) + try { + val leaderBrokerId = 0 + val followerBrokerId = 1 + val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava + val topicId = Uuid.randomUuid() + val tp0 = new TopicPartition(topic, 0) + val tidp0 = new TopicIdPartition(topicId, tp0) + initializeLogAndTopicId(replicaManager, tp0, topicId) + when(replicaManager.metadataCache.getPartitionReplicaEndpoints( + tp0, + new ListenerName("default") + )).thenReturn(Map( + leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"), + followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-a") + ).toMap) Review comment: Do we need this if the selection is not ran? ########## File path: core/src/test/java/kafka/test/MockReplicaSelector.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.test; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.replica.ClientMetadata; +import org.apache.kafka.common.replica.PartitionView; +import org.apache.kafka.common.replica.ReplicaSelector; +import org.apache.kafka.common.replica.ReplicaView; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; + +public class MockReplicaSelector implements ReplicaSelector { + private final AtomicLong triggerElectionCount = new AtomicLong(); + + public long getTriggerElectionCount() { Review comment: I guess you meant `Selection` here. ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -1300,6 +1301,59 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + @Test + def testFetchFromFollowerShouldNotRunPreferLeaderSelect(): Unit = { + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, "kafka.test.MockReplicaSelector")) + try { + val leaderBrokerId = 0 + val followerBrokerId = 1 + val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava + val topicId = Uuid.randomUuid() + val tp0 = new TopicPartition(topic, 0) + val tidp0 = new TopicIdPartition(topicId, tp0) + initializeLogAndTopicId(replicaManager, tp0, topicId) + when(replicaManager.metadataCache.getPartitionReplicaEndpoints( + tp0, + new ListenerName("default") + )).thenReturn(Map( + leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"), + followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-a") + ).toMap) + + // Make this replica the follower + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(1) + .setLeaderEpoch(1) + .setIsr(brokerList) + .setZkVersion(0) + .setReplicas(brokerList) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, topicId), + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ()) + + val metadata = new DefaultClientMetadata("rack-a", "client-id", + InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, "default") + + val consumerResult = fetchAsConsumer(replicaManager, tidp0, + new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, + Optional.empty()), clientMetadata = Some(metadata)) + + // Fetch from follower succeeds + assertTrue(consumerResult.isFired) + + // Expect not run the preferred read replica selection + assertEquals(0, replicaManager.replicaSelectorOpt.get.asInstanceOf[MockReplicaSelector].getTriggerElectionCount) + + } finally replicaManager.shutdown(checkpointHW = false) + } + + Review comment: nit: We can remove one empty line here. ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -1300,6 +1301,59 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + @Test + def testFetchFromFollowerShouldNotRunPreferLeaderSelect(): Unit = { + val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), + propsModifier = props => props.put(KafkaConfig.ReplicaSelectorClassProp, "kafka.test.MockReplicaSelector")) + try { + val leaderBrokerId = 0 + val followerBrokerId = 1 + val brokerList = Seq[Integer](leaderBrokerId, followerBrokerId).asJava + val topicId = Uuid.randomUuid() + val tp0 = new TopicPartition(topic, 0) + val tidp0 = new TopicIdPartition(topicId, tp0) + initializeLogAndTopicId(replicaManager, tp0, topicId) + when(replicaManager.metadataCache.getPartitionReplicaEndpoints( + tp0, + new ListenerName("default") + )).thenReturn(Map( + leaderBrokerId -> new Node(leaderBrokerId, "host1", 9092, "rack-a"), + followerBrokerId -> new Node(followerBrokerId, "host2", 9092, "rack-a") + ).toMap) + + // Make this replica the follower + val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch, + Seq(new LeaderAndIsrPartitionState() + .setTopicName(topic) + .setPartitionIndex(0) + .setControllerEpoch(0) + .setLeader(1) + .setLeaderEpoch(1) + .setIsr(brokerList) + .setZkVersion(0) + .setReplicas(brokerList) + .setIsNew(false)).asJava, + Collections.singletonMap(topic, topicId), + Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() + replicaManager.becomeLeaderOrFollower(1, leaderAndIsrRequest, (_, _) => ()) + + val metadata = new DefaultClientMetadata("rack-a", "client-id", + InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS, "default") + + val consumerResult = fetchAsConsumer(replicaManager, tidp0, + new PartitionData(Uuid.ZERO_UUID, 0, 0, 100000, + Optional.empty()), clientMetadata = Some(metadata)) + + // Fetch from follower succeeds + assertTrue(consumerResult.isFired) + Review comment: Could we also assert that no preferred leader is returned in the result? ########## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ########## @@ -1234,8 +1234,9 @@ class ReplicaManager(val config: KafkaConfig, fetchOffset: Long, currentTimeMs: Long): Option[Int] = { partition.leaderReplicaIdOpt.flatMap { leaderReplicaId => Review comment: We should add a comment to `leaderReplicaIdOpt` in the `Partition` class to explain it a bit better. Could you do it please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org