This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new 44229581ed8 KAFKA-13986; Brokers should include node.id in fetches to metadata quorum (#12498) 44229581ed8 is described below commit 44229581ed8994416c41ac6584c150564185f0da Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Thu Aug 11 14:08:37 2022 -0700 KAFKA-13986; Brokers should include node.id in fetches to metadata quorum (#12498) Currently we do not set the replicaId in fetches from brokers to the metadata quorum. It is useful to do so since that allows us to debug replication using the `DescribeQuorum` API. Reviewers: dengziming <dengziming1...@gmail.com>, José Armando García Sancio <jsan...@users.noreply.github.com> --- core/src/main/scala/kafka/raft/RaftManager.scala | 8 +------- core/src/test/java/kafka/test/ClusterInstance.java | 13 +++++++++++++ .../kafka/test/junit/RaftClusterInvocationContext.java | 15 +++++++++++++++ .../java/kafka/test/junit/ZkClusterInvocationContext.java | 13 +++++++++++++ core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala | 8 ++++---- .../unit/kafka/server/DescribeQuorumRequestTest.scala | 9 +++++---- 6 files changed, 51 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala index cbb9f7b89bf..a44d9d8fe01 100644 --- a/core/src/main/scala/kafka/raft/RaftManager.scala +++ b/core/src/main/scala/kafka/raft/RaftManager.scala @@ -24,7 +24,6 @@ import java.util.concurrent.CompletableFuture import kafka.log.UnifiedLog import kafka.raft.KafkaRaftManager.RaftIoThread import kafka.server.{KafkaConfig, MetaProperties} -import kafka.server.KafkaRaftServer.ControllerRole import kafka.utils.timer.SystemTimer import kafka.utils.{KafkaScheduler, Logging, ShutdownableThread} import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient} @@ -181,12 +180,7 @@ class KafkaRaftManager[T]( val expirationTimer = new SystemTimer("raft-expiration-executor") val expirationService = new TimingWheelExpirationService(expirationTimer) val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state")) - - val nodeId = if (config.processRoles.contains(ControllerRole)) { - OptionalInt.of(config.nodeId) - } else { - OptionalInt.empty() - } + val nodeId = OptionalInt.of(config.nodeId) val client = new KafkaRaftClient( recordSerde, diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java index a7052857c36..9058508fa94 100644 --- a/core/src/test/java/kafka/test/ClusterInstance.java +++ b/core/src/test/java/kafka/test/ClusterInstance.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; public interface ClusterInstance { @@ -50,6 +51,18 @@ public interface ClusterInstance { */ ClusterConfig config(); + /** + * Return the set of all controller IDs configured for this test. For kraft, this + * will return only the nodes which have the "controller" role enabled in `process.roles`. + * For zookeeper, this will return all broker IDs since they are all eligible controllers. + */ + Set<Integer> controllerIds(); + + /** + * Return the set of all broker IDs configured for this test. + */ + Set<Integer> brokerIds(); + /** * The listener for this cluster as configured by {@link ClusterTest} or by {@link ClusterConfig}. If * unspecified by those sources, this will return the listener for the default security protocol PLAINTEXT diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index cef71042d3f..5cd3ec3e246 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -192,6 +193,20 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte return clusterConfig; } + @Override + public Set<Integer> controllerIds() { + return controllers() + .map(controllerServer -> controllerServer.config().nodeId()) + .collect(Collectors.toSet()); + } + + @Override + public Set<Integer> brokerIds() { + return brokers() + .map(brokerServer -> brokerServer.config().nodeId()) + .collect(Collectors.toSet()); + } + @Override public KafkaClusterTestKit getUnderlying() { return clusterReference.get(); diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java index d8375b01279..18a85e2d7bf 100644 --- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java @@ -45,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -257,6 +258,18 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext return config; } + @Override + public Set<Integer> controllerIds() { + return brokerIds(); + } + + @Override + public Set<Integer> brokerIds() { + return servers() + .map(brokerServer -> brokerServer.config().nodeId()) + .collect(Collectors.toSet()); + } + @Override public IntegrationTestHarness getUnderlying() { return clusterReference.get(); diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala index f8fac503d6e..9d7a93db94c 100644 --- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala +++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala @@ -82,23 +82,23 @@ class RaftManagerTest { } @Test - def testSentinelNodeIdIfBrokerRoleOnly(): Unit = { + def testNodeIdPresentIfBrokerRoleOnly(): Unit = { val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "broker", "1") - assertFalse(raftManager.client.nodeId.isPresent) + assertEquals(1, raftManager.client.nodeId.getAsInt) raftManager.shutdown() } @Test def testNodeIdPresentIfControllerRoleOnly(): Unit = { val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller", "1") - assertTrue(raftManager.client.nodeId.getAsInt == 1) + assertEquals(1, raftManager.client.nodeId.getAsInt) raftManager.shutdown() } @Test def testNodeIdPresentIfColocated(): Unit = { val raftManager = instantiateRaftManagerWithConfigs(new TopicPartition("__raft_id_test", 0), "controller,broker", "1") - assertTrue(raftManager.client.nodeId.getAsInt == 1) + assertEquals(1, raftManager.client.nodeId.getAsInt) raftManager.shutdown() } diff --git a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala index f8da00f10e8..b53004b2eaf 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala @@ -80,11 +80,12 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) { assertTrue(leaderState.logEndOffset > 0) val voterData = partitionData.currentVoters.asScala + assertEquals(cluster.controllerIds().asScala, voterData.map(_.replicaId).toSet); + val observerData = partitionData.observers.asScala - assertEquals(1, voterData.size) - assertEquals(0, observerData.size) - voterData.foreach { state => - assertTrue(0 < state.replicaId) + assertEquals(cluster.brokerIds().asScala, observerData.map(_.replicaId).toSet); + + (voterData ++ observerData).foreach { state => assertTrue(0 < state.logEndOffset) assertEquals(-1, state.lastFetchTimestamp) assertEquals(-1, state.lastCaughtUpTimestamp)