This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b825af559ac KAFKA-19755 Move KRaftClusterTest from core module to 
server module [2/4] (#20861)
b825af559ac is described below

commit b825af559acb1c5e3d787de706eeb46769e6f888
Author: Lan Ding <[email protected]>
AuthorDate: Fri Dec 12 22:39:33 2025 +0800

    KAFKA-19755 Move KRaftClusterTest from core module to server module [2/4] 
(#20861)
    
    Move KRaftClusterTest from core module to server module.
    Rewrite
    - testCreateClusterAndCreateListDeleteTopic
    - testCreateClusterAndCreateAndManyTopics
    - testClientQuotas
    - testDefaultClientQuotas
    - testCreateClusterWithAdvertisedPortZero
    - testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer
    - testUnregisterBroker
    
    Reviewers: Hong-Yi Chen <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../kafka/server/KRaftClusterTest.scala            | 405 +--------------------
 .../org/apache/kafka/server/KRaftClusterTest.java  | 402 +++++++++++++++++++-
 2 files changed, 399 insertions(+), 408 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index b0f44b8683b..24a13e3c753 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -17,25 +17,19 @@
 
 package kafka.server
 
-import kafka.network.SocketServer
 import kafka.utils.TestUtils
-import org.apache.kafka.server.IntegrationTestUtils.connectAndReceive
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type
 import org.apache.kafka.common.errors.{InvalidPartitionsException, 
PolicyViolationException, UnsupportedVersionException}
-import org.apache.kafka.common.message.DescribeClusterRequestData
 import org.apache.kafka.common.metadata.{ConfigRecord, FeatureLevelRecord}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors._
-import org.apache.kafka.common.quota.ClientQuotaAlteration.Op
-import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
-import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, 
DescribeClusterResponse}
+import org.apache.kafka.common.requests.ApiError
 import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
 import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo}
 import org.apache.kafka.controller.{QuorumController, 
QuorumControllerIntegrationTestUtils}
-import org.apache.kafka.image.ClusterImage
 import org.apache.kafka.metadata.BrokerState
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
 import org.apache.kafka.network.SocketServerConfigs
@@ -52,11 +46,10 @@ import org.slf4j.LoggerFactory
 import java.io.File
 import java.nio.charset.StandardCharsets
 import java.nio.file.{FileSystems, Files, Path, Paths}
-import java.{lang, util}
+import java.util
 import java.util.concurrent.{ExecutionException, TimeUnit}
 import java.util.{Optional, OptionalLong, Properties}
 import scala.collection.{Seq, mutable}
-import scala.concurrent.duration.{FiniteDuration, MILLISECONDS, SECONDS}
 import scala.jdk.CollectionConverters._
 import scala.util.Using
 
@@ -93,348 +86,6 @@ class KRaftClusterTest {
     }
   }
 
-  @Test
-  def testCreateClusterAndCreateListDeleteTopic(): Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setNumBrokerNodes(3).
-        setNumControllerNodes(3).build()).build()
-    try {
-      cluster.format()
-      cluster.startup()
-      cluster.waitForReadyBrokers()
-      TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == 
BrokerState.RUNNING,
-        "Broker never made it to RUNNING state.")
-      TestUtils.waitUntilTrue(() => 
cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
-        "RaftManager was not initialized.")
-
-      val admin = Admin.create(cluster.clientProperties())
-      try {
-        // Create a test topic
-        val newTopic = util.List.of(new NewTopic("test-topic", 1, 3.toShort))
-        val createTopicResult = admin.createTopics(newTopic)
-        createTopicResult.all().get()
-        waitForTopicListing(admin, Seq("test-topic"), Seq())
-
-        // Delete topic
-        val deleteResult = admin.deleteTopics(util.List.of("test-topic"))
-        deleteResult.all().get()
-
-        // List again
-        waitForTopicListing(admin, Seq(), Seq("test-topic"))
-      } finally {
-        admin.close()
-      }
-    } finally {
-      cluster.close()
-    }
-  }
-
-  @Test
-  def testCreateClusterAndCreateAndManyTopics(): Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setNumBrokerNodes(3).
-        setNumControllerNodes(3).build()).build()
-    try {
-      cluster.format()
-      cluster.startup()
-      cluster.waitForReadyBrokers()
-      TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == 
BrokerState.RUNNING,
-        "Broker never made it to RUNNING state.")
-      TestUtils.waitUntilTrue(() => 
cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
-        "RaftManager was not initialized.")
-      val admin = Admin.create(cluster.clientProperties())
-      try {
-        // Create many topics
-        val newTopic = new util.ArrayList[NewTopic]()
-        newTopic.add(new NewTopic("test-topic-1", 2, 3.toShort))
-        newTopic.add(new NewTopic("test-topic-2", 2, 3.toShort))
-        newTopic.add(new NewTopic("test-topic-3", 2, 3.toShort))
-        val createTopicResult = admin.createTopics(newTopic)
-        createTopicResult.all().get()
-
-        // List created topics
-        waitForTopicListing(admin, Seq("test-topic-1", "test-topic-2", 
"test-topic-3"), Seq())
-      } finally {
-        admin.close()
-      }
-    } finally {
-      cluster.close()
-    }
-  }
-
-  @Test
-  def testClientQuotas(): Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setNumBrokerNodes(1).
-        setNumControllerNodes(1).build()).build()
-    try {
-      cluster.format()
-      cluster.startup()
-      TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == 
BrokerState.RUNNING,
-        "Broker never made it to RUNNING state.")
-      val admin = Admin.create(cluster.clientProperties())
-      try {
-        val entity = new ClientQuotaEntity(util.Map.of("user", "testkit"))
-        var filter = ClientQuotaFilter.containsOnly(
-          List(ClientQuotaFilterComponent.ofEntity("user", "testkit")).asJava)
-
-        def alterThenDescribe(entity: ClientQuotaEntity,
-                              quotas: Seq[ClientQuotaAlteration.Op],
-                              filter: ClientQuotaFilter,
-                              expectCount: Int): util.Map[ClientQuotaEntity, 
util.Map[String, lang.Double]] = {
-          val alterResult = admin.alterClientQuotas(util.List.of(new 
ClientQuotaAlteration(entity, quotas.asJava)))
-          try {
-            alterResult.all().get()
-          } catch {
-            case t: Throwable => fail("AlterClientQuotas request failed", t)
-          }
-
-          def describeOrFail(filter: ClientQuotaFilter): 
util.Map[ClientQuotaEntity, util.Map[String, lang.Double]] = {
-            try {
-              admin.describeClientQuotas(filter).entities().get()
-            } catch {
-              case t: Throwable => fail("DescribeClientQuotas request failed", 
t)
-            }
-          }
-
-          val (describeResult, ok) = 
TestUtils.computeUntilTrue(describeOrFail(filter)) {
-            results => results.getOrDefault(entity, util.Map.of[String, 
lang.Double]()).size() == expectCount
-          }
-          assertTrue(ok, "Broker never saw new client quotas")
-          describeResult
-        }
-
-        var describeResult = alterThenDescribe(entity,
-          Seq(new ClientQuotaAlteration.Op("request_percentage", 0.99)), 
filter, 1)
-        assertEquals(0.99, 
describeResult.get(entity).get("request_percentage"), 1e-6)
-
-        describeResult = alterThenDescribe(entity, Seq(
-          new ClientQuotaAlteration.Op("request_percentage", 0.97),
-          new ClientQuotaAlteration.Op("producer_byte_rate", 10000),
-          new ClientQuotaAlteration.Op("consumer_byte_rate", 10001)
-        ), filter, 3)
-        assertEquals(0.97, 
describeResult.get(entity).get("request_percentage"), 1e-6)
-        assertEquals(10000.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6)
-        assertEquals(10001.0, 
describeResult.get(entity).get("consumer_byte_rate"), 1e-6)
-
-        describeResult = alterThenDescribe(entity, Seq(
-          new ClientQuotaAlteration.Op("request_percentage", 0.95),
-          new ClientQuotaAlteration.Op("producer_byte_rate", null),
-          new ClientQuotaAlteration.Op("consumer_byte_rate", null)
-        ), filter, 1)
-        assertEquals(0.95, 
describeResult.get(entity).get("request_percentage"), 1e-6)
-
-        describeResult = alterThenDescribe(entity, Seq(
-          new ClientQuotaAlteration.Op("request_percentage", null)), filter, 0)
-
-        describeResult = alterThenDescribe(entity,
-          Seq(new ClientQuotaAlteration.Op("producer_byte_rate", 9999)), 
filter, 1)
-        assertEquals(9999.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6)
-
-        // Add another quota for a different entity with same user part
-        val entity2 = new ClientQuotaEntity(util.Map.of("user", "testkit", 
"client-id", "some-client"))
-        filter = ClientQuotaFilter.containsOnly(
-          util.List.of(
-            ClientQuotaFilterComponent.ofEntity("user", "testkit"),
-            ClientQuotaFilterComponent.ofEntity("client-id", "some-client"),
-          ))
-        describeResult = alterThenDescribe(entity2,
-          Seq(new ClientQuotaAlteration.Op("producer_byte_rate", 9998)), 
filter, 1)
-        assertEquals(9998.0, 
describeResult.get(entity2).get("producer_byte_rate"), 1e-6)
-
-        // non-strict match
-        filter = ClientQuotaFilter.contains(
-          util.List.of(ClientQuotaFilterComponent.ofEntity("user", "testkit")))
-
-        TestUtils.tryUntilNoAssertionError() {
-          val results = admin.describeClientQuotas(filter).entities().get()
-          assertEquals(2, results.size(), "Broker did not see two client 
quotas")
-          assertEquals(9999.0, results.get(entity).get("producer_byte_rate"), 
1e-6)
-          assertEquals(9998.0, results.get(entity2).get("producer_byte_rate"), 
1e-6)
-        }
-      } finally {
-        admin.close()
-      }
-    } finally {
-      cluster.close()
-    }
-  }
-
-  def setConsumerByteRate(
-    admin: Admin,
-    entity: ClientQuotaEntity,
-    value: Long
-  ): Unit = {
-    admin.alterClientQuotas(util.List.of(
-      new ClientQuotaAlteration(entity, util.List.of(
-        new Op("consumer_byte_rate", value.doubleValue()))))).
-        all().get()
-  }
-
-  def getConsumerByteRates(admin: Admin): Map[ClientQuotaEntity, Long] = {
-    val allFilter = ClientQuotaFilter.contains(util.List.of)
-    val results = new util.HashMap[ClientQuotaEntity, Long]
-    admin.describeClientQuotas(allFilter).entities().get().forEach {
-      case (entity, entityMap) =>
-        Option(entityMap.get("consumer_byte_rate")).foreach(value => 
results.put(entity, value.longValue()))
-    }
-    results.asScala.toMap
-  }
-
-  @Test
-  def testDefaultClientQuotas(): Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setNumBrokerNodes(1).
-        setNumControllerNodes(1).build()).build()
-    try {
-      cluster.format()
-      cluster.startup()
-      TestUtils.waitUntilTrue(() => cluster.brokers().get(0).brokerState == 
BrokerState.RUNNING,
-        "Broker never made it to RUNNING state.")
-      val admin = Admin.create(cluster.clientProperties())
-      try {
-        val defaultUser = new 
ClientQuotaEntity(util.Collections.singletonMap[String, String]("user", null))
-        val bobUser = new ClientQuotaEntity(util.Map.of[String, 
String]("user", "bob"))
-        TestUtils.retry(30000) {
-          assertEquals(Map(), getConsumerByteRates(admin))
-        }
-        setConsumerByteRate(admin, defaultUser, 100L)
-        TestUtils.retry(30000) {
-          assertEquals(Map(
-              defaultUser -> 100L
-            ), getConsumerByteRates(admin))
-        }
-        setConsumerByteRate(admin, bobUser, 1000L)
-        TestUtils.retry(30000) {
-          assertEquals(Map(
-            defaultUser -> 100L,
-            bobUser -> 1000L
-          ), getConsumerByteRates(admin))
-        }
-      } finally {
-        admin.close()
-      }
-    } finally {
-      cluster.close()
-    }
-  }
-
-  @Test
-  def testCreateClusterWithAdvertisedPortZero(): Unit = {
-    val brokerPropertyOverrides: util.Map[Integer, util.Map[String, String]] = 
new util.HashMap[Integer, util.Map[String, String]]()
-    Seq.range(0, 3).asJava.forEach(brokerId => {
-      val props = new util.HashMap[String, String]()
-      props.put(SocketServerConfigs.LISTENERS_CONFIG, "EXTERNAL://localhost:0")
-      props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"EXTERNAL://localhost:0")
-      brokerPropertyOverrides.put(brokerId, props)
-    })
-
-    val nodes = new TestKitNodes.Builder()
-      .setNumControllerNodes(1)
-      .setNumBrokerNodes(3)
-      .setPerServerProperties(brokerPropertyOverrides)
-      .build()
-
-    doOnStartedKafkaCluster(nodes) { implicit cluster =>
-      
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.brokerListenerName,
 (15L, SECONDS))
-        .nodes.values.forEach { broker =>
-          assertEquals("localhost", broker.host,
-            "Did not advertise configured advertised host")
-          
assertEquals(cluster.brokers.get(broker.id).socketServer.boundPort(cluster.nodes.brokerListenerName),
 broker.port,
-            "Did not advertise bound socket port")
-        }
-    }
-  }
-
-  @Test
-  def testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer(): 
Unit = {
-    val brokerPropertyOverrides: util.Map[Integer, util.Map[String, String]] = 
new util.HashMap[Integer, util.Map[String, String]]()
-    Seq.range(0, 3).asJava.forEach(brokerId => {
-      val props = new util.HashMap[String, String]()
-      props.put(SocketServerConfigs.LISTENERS_CONFIG, "EXTERNAL://localhost:0")
-      props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
s"EXTERNAL://advertised-host-$brokerId:${brokerId + 100}")
-      brokerPropertyOverrides.put(brokerId, props)
-    })
-
-    val nodes = new TestKitNodes.Builder()
-      .setNumControllerNodes(1)
-      .setNumBrokerNodes(3)
-      .setNumDisksPerBroker(1)
-      .setPerServerProperties(brokerPropertyOverrides)
-      .build()
-
-    doOnStartedKafkaCluster(nodes) { implicit cluster =>
-      
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes.brokerListenerName,
 (15L, SECONDS))
-        .nodes.values.forEach { broker =>
-          assertEquals(s"advertised-host-${broker.id}", broker.host, "Did not 
advertise configured advertised host")
-          assertEquals(broker.id + 100, broker.port, "Did not advertise 
configured advertised port")
-        }
-    }
-  }
-
-  private def doOnStartedKafkaCluster(nodes: TestKitNodes)
-                                     (action: KafkaClusterTestKit => Unit): 
Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(nodes).build()
-    try {
-      cluster.format()
-      cluster.startup()
-      action(cluster)
-    } finally {
-      cluster.close()
-    }
-  }
-
-  private def 
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(listenerName: 
ListenerName,
-                                                                             
waitTime: FiniteDuration)
-                                                                            
(implicit cluster: KafkaClusterTestKit): DescribeClusterResponse = {
-    val startTime = System.currentTimeMillis
-    val runningBrokerServers = waitForRunningBrokers(1, waitTime)
-    val remainingWaitTime = waitTime - (System.currentTimeMillis - startTime, 
MILLISECONDS)
-    sendDescribeClusterRequestToBoundPortUntilBrokersPropagated(
-      runningBrokerServers.head, listenerName,
-      cluster.nodes.brokerNodes.size, remainingWaitTime)
-  }
-
-  private def waitForRunningBrokers(count: Int, waitTime: FiniteDuration)
-                                   (implicit cluster: KafkaClusterTestKit): 
Seq[BrokerServer] = {
-    def getRunningBrokerServers: Seq[BrokerServer] = 
cluster.brokers.values.asScala.toSeq
-      .filter(brokerServer => brokerServer.brokerState == BrokerState.RUNNING)
-
-    val (runningBrokerServers, hasRunningBrokers) = 
TestUtils.computeUntilTrue(getRunningBrokerServers, 
waitTime.toMillis)(_.nonEmpty)
-    assertTrue(hasRunningBrokers,
-      s"After ${waitTime.toMillis} ms at least $count broker(s) should be in 
RUNNING state, " +
-        s"but only ${runningBrokerServers.size} broker(s) are.")
-    runningBrokerServers
-  }
-
-  private def 
sendDescribeClusterRequestToBoundPortUntilBrokersPropagated(destination: 
BrokerServer,
-                                                                          
listenerName: ListenerName,
-                                                                          
expectedBrokerCount: Int,
-                                                                          
waitTime: FiniteDuration): DescribeClusterResponse = {
-    val (describeClusterResponse, metadataUpToDate) = 
TestUtils.computeUntilTrue(
-      compute = 
sendDescribeClusterRequestToBoundPort(destination.socketServer, listenerName),
-      waitTime = waitTime.toMillis
-    ) {
-      response => response.nodes.size == expectedBrokerCount
-    }
-
-    assertTrue(metadataUpToDate,
-      s"After ${waitTime.toMillis} ms Broker is only aware of 
${describeClusterResponse.nodes.size} brokers, " +
-        s"but $expectedBrokerCount are expected.")
-
-    describeClusterResponse
-  }
-
-  private def sendDescribeClusterRequestToBoundPort(destination: SocketServer,
-                                                    listenerName: 
ListenerName): DescribeClusterResponse = {
-    connectAndReceive[DescribeClusterResponse](new 
DescribeClusterRequest.Builder(new DescribeClusterRequestData()).build(),
-      destination.boundPort(listenerName))
-  }
-
   @Test
   def testCreateClusterAndPerformReassignment(): Unit = {
     val cluster = new KafkaClusterTestKit.Builder(
@@ -746,58 +397,6 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
-  private def clusterImage(
-    cluster: KafkaClusterTestKit,
-    brokerId: Int
-  ): ClusterImage = {
-    cluster.brokers().get(brokerId).metadataCache.currentImage().cluster()
-  }
-
-  private def brokerIsUnfenced(
-    image: ClusterImage,
-    brokerId: Int
-  ): Boolean = {
-    Option(image.brokers().get(brokerId)) match {
-      case None => false
-      case Some(registration) => !registration.fenced()
-    }
-  }
-
-  private def brokerIsAbsent(
-    image: ClusterImage,
-    brokerId: Int
-  ): Boolean = {
-    Option(image.brokers().get(brokerId)).isEmpty
-  }
-
-  @ParameterizedTest
-  @ValueSource(booleans = Array(true, false))
-  def testUnregisterBroker(usingBootstrapController: Boolean): Unit = {
-    val cluster = new KafkaClusterTestKit.Builder(
-      new TestKitNodes.Builder().
-        setNumBrokerNodes(4).
-        setNumControllerNodes(3).build()).build()
-    try {
-      cluster.format()
-      cluster.startup()
-      cluster.waitForReadyBrokers()
-      TestUtils.waitUntilTrue(() => brokerIsUnfenced(clusterImage(cluster, 1), 
0),
-        "Timed out waiting for broker 0 to be unfenced.")
-      cluster.brokers().get(0).shutdown()
-      TestUtils.waitUntilTrue(() => !brokerIsUnfenced(clusterImage(cluster, 
1), 0),
-        "Timed out waiting for broker 0 to be fenced.")
-      val admin = createAdminClient(cluster, bootstrapController = 
usingBootstrapController)
-      try {
-        admin.unregisterBroker(0)
-      } finally {
-        admin.close()
-      }
-      TestUtils.waitUntilTrue(() => brokerIsAbsent(clusterImage(cluster, 1), 
0),
-        "Timed out waiting for broker 0 to be fenced.")
-    } finally {
-      cluster.close()
-    }
-  }
 
   def createAdminClient(cluster: KafkaClusterTestKit, bootstrapController: 
Boolean): Admin = {
     var props: Properties = null
diff --git a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java 
b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
index 7f051dfbffa..3286d0f185c 100644
--- a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
+++ b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
@@ -16,10 +16,16 @@
  */
 package org.apache.kafka.server;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.AlterClientQuotasResult;
 import org.apache.kafka.clients.admin.AlterConfigOp;
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
 import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.Reconfigurable;
@@ -27,10 +33,19 @@ import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.ConfigResource.Type;
+import org.apache.kafka.common.message.DescribeClusterRequestData;
 import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.quota.ClientQuotaFilter;
+import org.apache.kafka.common.quota.ClientQuotaFilterComponent;
+import org.apache.kafka.common.requests.DescribeClusterRequest;
+import org.apache.kafka.common.requests.DescribeClusterResponse;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.test.KafkaClusterTestKit;
 import org.apache.kafka.common.test.TestKitNodes;
+import org.apache.kafka.image.ClusterImage;
+import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.metadata.BrokerState;
 import org.apache.kafka.network.SocketServerConfigs;
 import org.apache.kafka.server.authorizer.AclCreateResult;
@@ -43,28 +58,39 @@ import 
org.apache.kafka.server.authorizer.AuthorizerServerInfo;
 import org.apache.kafka.server.config.ReplicationConfigs;
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
 import org.apache.kafka.server.quota.ClientQuotaCallback;
-import org.apache.kafka.server.quota.ClientQuotaEntity;
 import org.apache.kafka.server.quota.ClientQuotaType;
 import org.apache.kafka.test.TestUtils;
 
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
+import static org.apache.kafka.server.IntegrationTestUtils.connectAndReceive;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+@Timeout(120)
+@Tag("integration")
 public class KRaftClusterTest {
 
     @Test
@@ -168,8 +194,8 @@ public class KRaftClusterTest {
     @Test
     public void testAuthorizerFailureFoundInControllerStartup() throws 
Exception {
         try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
-            new TestKitNodes.Builder().
-                setNumControllerNodes(3).build())
+            new TestKitNodes.Builder()
+                .setNumControllerNodes(3).build())
             .setConfigProp("authorizer.class.name", 
BadAuthorizer.class.getName())
             .build()) {
             cluster.format();
@@ -262,6 +288,372 @@ public class KRaftClusterTest {
         });
     }
 
+    @Test
+    public void testCreateClusterAndCreateListDeleteTopic() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(3)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+            TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+                "RaftManager was not initialized.");
+
+            String testTopic = "test-topic";
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                // Create a test topic
+                List<NewTopic> newTopic = List.of(new NewTopic(testTopic, 1, 
(short) 3));
+                CreateTopicsResult createTopicResult = 
admin.createTopics(newTopic);
+                createTopicResult.all().get();
+                waitForTopicListing(admin, List.of(testTopic), List.of());
+
+                // Delete topic
+                DeleteTopicsResult deleteResult = 
admin.deleteTopics(List.of(testTopic));
+                deleteResult.all().get();
+
+                // List again
+                waitForTopicListing(admin, List.of(), List.of(testTopic));
+            }
+        }
+    }
+
+    @Test
+    public void testCreateClusterAndCreateAndManyTopics() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(3)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+            TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+                "RaftManager was not initialized.");
+
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                // Create many topics
+                List<NewTopic> newTopics = List.of(
+                    new NewTopic("test-topic-1", 2, (short) 3),
+                    new NewTopic("test-topic-2", 2, (short) 3),
+                    new NewTopic("test-topic-3", 2, (short) 3)
+                );
+                CreateTopicsResult createTopicResult = 
admin.createTopics(newTopics);
+                createTopicResult.all().get();
+
+                // List created topics
+                waitForTopicListing(admin, List.of("test-topic-1", 
"test-topic-2", "test-topic-3"), List.of());
+            }
+        }
+    }
+
+    private Map<ClientQuotaEntity, Map<String, Double>> alterThenDescribe(
+        Admin admin,
+        ClientQuotaEntity entity,
+        List<ClientQuotaAlteration.Op> quotas,
+        ClientQuotaFilter filter,
+        int expectCount
+    ) throws Exception {
+        AlterClientQuotasResult alterResult = 
admin.alterClientQuotas(List.of(new ClientQuotaAlteration(entity, quotas)));
+        alterResult.all().get();
+
+        TestUtils.waitForCondition(() -> {
+            Map<ClientQuotaEntity, Map<String, Double>> results = 
admin.describeClientQuotas(filter).entities().get();
+            return results.getOrDefault(entity, Map.of()).size() == 
expectCount;
+        }, "Broker never saw new client quotas");
+
+        return admin.describeClientQuotas(filter).entities().get();
+    }
+
+    @Test
+    public void testClientQuotas() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                ClientQuotaEntity entity = new 
ClientQuotaEntity(Map.of("user", "testkit"));
+                ClientQuotaFilter filter = ClientQuotaFilter.containsOnly(
+                    List.of(ClientQuotaFilterComponent.ofEntity("user", 
"testkit")));
+
+                Map<ClientQuotaEntity, Map<String, Double>> describeResult = 
alterThenDescribe(admin, entity,
+                    List.of(new ClientQuotaAlteration.Op("request_percentage", 
0.99)), filter, 1);
+                assertEquals(0.99, 
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+                describeResult = alterThenDescribe(admin, entity, List.of(
+                    new ClientQuotaAlteration.Op("request_percentage", 0.97),
+                    new ClientQuotaAlteration.Op("producer_byte_rate", 
10000.0),
+                    new ClientQuotaAlteration.Op("consumer_byte_rate", 10001.0)
+                ), filter, 3);
+                assertEquals(0.97, 
describeResult.get(entity).get("request_percentage"), 1e-6);
+                assertEquals(10000.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+                assertEquals(10001.0, 
describeResult.get(entity).get("consumer_byte_rate"), 1e-6);
+
+                describeResult = alterThenDescribe(admin, entity, List.of(
+                    new ClientQuotaAlteration.Op("request_percentage", 0.95),
+                    new ClientQuotaAlteration.Op("producer_byte_rate", null),
+                    new ClientQuotaAlteration.Op("consumer_byte_rate", null)
+                ), filter, 1);
+                assertEquals(0.95, 
describeResult.get(entity).get("request_percentage"), 1e-6);
+
+                alterThenDescribe(admin, entity, List.of(
+                    new ClientQuotaAlteration.Op("request_percentage", null)), 
filter, 0);
+
+                describeResult = alterThenDescribe(admin, entity,
+                    List.of(new ClientQuotaAlteration.Op("producer_byte_rate", 
9999.0)), filter, 1);
+                assertEquals(9999.0, 
describeResult.get(entity).get("producer_byte_rate"), 1e-6);
+
+                ClientQuotaEntity entity2 = new 
ClientQuotaEntity(Map.of("user", "testkit", "client-id", "some-client"));
+                filter = ClientQuotaFilter.containsOnly(
+                    List.of(
+                        ClientQuotaFilterComponent.ofEntity("user", "testkit"),
+                        ClientQuotaFilterComponent.ofEntity("client-id", 
"some-client")
+                    ));
+                describeResult = alterThenDescribe(admin, entity2,
+                    List.of(new ClientQuotaAlteration.Op("producer_byte_rate", 
9998.0)), filter, 1);
+                assertEquals(9998.0, 
describeResult.get(entity2).get("producer_byte_rate"), 1e-6);
+
+                final ClientQuotaFilter finalFilter = 
ClientQuotaFilter.contains(
+                    List.of(ClientQuotaFilterComponent.ofEntity("user", 
"testkit")));
+
+                TestUtils.waitForCondition(() -> {
+                    Map<ClientQuotaEntity, Map<String, Double>> results = 
admin.describeClientQuotas(finalFilter).entities().get();
+                    if (results.size() != 2) {
+                        return false;
+                    }
+                    assertEquals(9999.0, 
results.get(entity).get("producer_byte_rate"), 1e-6);
+                    assertEquals(9998.0, 
results.get(entity2).get("producer_byte_rate"), 1e-6);
+                    return true;
+                }, "Broker did not see two client quotas");
+            }
+        }
+    }
+
+    private void setConsumerByteRate(Admin admin, ClientQuotaEntity entity, 
Long value) throws Exception {
+        admin.alterClientQuotas(List.of(
+            new ClientQuotaAlteration(entity, List.of(
+                new ClientQuotaAlteration.Op("consumer_byte_rate", 
value.doubleValue())))
+        )).all().get();
+    }
+
+    private Map<ClientQuotaEntity, Long> getConsumerByteRates(Admin admin) 
throws Exception {
+        return 
admin.describeClientQuotas(ClientQuotaFilter.contains(List.of()))
+            .entities().get()
+            .entrySet().stream()
+            .filter(entry -> 
entry.getValue().containsKey("consumer_byte_rate"))
+            .collect(Collectors.toMap(
+                Map.Entry::getKey,
+                entry -> entry.getValue().get("consumer_byte_rate").longValue()
+            ));
+    }
+
+    @Test
+    public void testDefaultClientQuotas() throws Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(1)
+                .setNumControllerNodes(1)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+
+            try (Admin admin = Admin.create(cluster.clientProperties())) {
+                ClientQuotaEntity defaultUser = new 
ClientQuotaEntity(Collections.singletonMap("user", null));
+                ClientQuotaEntity bobUser = new 
ClientQuotaEntity(Map.of("user", "bob"));
+
+                TestUtils.waitForCondition(
+                    () -> getConsumerByteRates(admin).isEmpty(),
+                    "Initial consumer byte rates should be empty");
+
+                setConsumerByteRate(admin, defaultUser, 100L);
+                TestUtils.waitForCondition(() -> {
+                    Map<ClientQuotaEntity, Long> rates = 
getConsumerByteRates(admin);
+                    return rates.size() == 1 &&
+                        rates.get(defaultUser) == 100L;
+                }, "Default user rate should be 100");
+
+                setConsumerByteRate(admin, bobUser, 1000L);
+                TestUtils.waitForCondition(() -> {
+                    Map<ClientQuotaEntity, Long> rates = 
getConsumerByteRates(admin);
+                    return rates.size() == 2 &&
+                        rates.get(defaultUser) == 100L &&
+                        rates.get(bobUser) == 1000L;
+                }, "Should have both default and bob user rates");
+            }
+        }
+    }
+
+    @Test
+    public void testCreateClusterWithAdvertisedPortZero() throws Exception {
+        Map<Integer, Map<String, String>> brokerPropertyOverrides = new 
HashMap<>();
+        for (int brokerId = 0; brokerId < 3; brokerId++) {
+            Map<String, String> props = new HashMap<>();
+            props.put(SocketServerConfigs.LISTENERS_CONFIG, 
"EXTERNAL://localhost:0");
+            props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"EXTERNAL://localhost:0");
+            brokerPropertyOverrides.put(brokerId, props);
+        }
+
+        TestKitNodes nodes = new TestKitNodes.Builder()
+            .setNumControllerNodes(1)
+            .setNumBrokerNodes(3)
+            .setPerServerProperties(brokerPropertyOverrides)
+            .build();
+
+        doOnStartedKafkaCluster(nodes, cluster ->
+            
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
 Duration.ofSeconds(15), cluster)
+                .nodes().values().forEach(broker -> {
+                    assertEquals("localhost", broker.host(),
+                        "Did not advertise configured advertised host");
+                    
assertEquals(cluster.brokers().get(broker.id()).socketServer().boundPort(cluster.nodes().brokerListenerName()),
 broker.port(),
+                        "Did not advertise bound socket port");
+                })
+        );
+    }
+
+    @Test
+    public void 
testCreateClusterWithAdvertisedHostAndPortDifferentFromSocketServer() throws 
Exception {
+        var brokerPropertyOverrides = IntStream.range(0, 
3).boxed().collect(Collectors.toMap(brokerId -> brokerId, brokerId -> Map.of(
+            SocketServerConfigs.LISTENERS_CONFIG, "EXTERNAL://localhost:0",
+            SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"EXTERNAL://advertised-host-" + brokerId + ":" + (brokerId + 100)
+        )));
+
+        TestKitNodes nodes = new TestKitNodes.Builder()
+            .setNumControllerNodes(1)
+            .setNumBrokerNodes(3)
+            .setNumDisksPerBroker(1)
+            .setPerServerProperties(brokerPropertyOverrides)
+            .build();
+
+        doOnStartedKafkaCluster(nodes, cluster ->
+            
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(cluster.nodes().brokerListenerName(),
 Duration.ofSeconds(15), cluster)
+                .nodes().values().forEach(broker -> {
+                    assertEquals("advertised-host-" + broker.id(), 
broker.host(), "Did not advertise configured advertised host");
+                    assertEquals(broker.id() + 100, broker.port(), "Did not 
advertise configured advertised port");
+                })
+        );
+    }
+
+    private void doOnStartedKafkaCluster(TestKitNodes nodes, 
Consumer<KafkaClusterTestKit> action) throws Exception {
+        try (KafkaClusterTestKit cluster = new 
KafkaClusterTestKit.Builder(nodes).build()) {
+            cluster.format();
+            cluster.startup();
+            action.accept(cluster);
+        }
+    }
+
+    private DescribeClusterResponse 
sendDescribeClusterRequestToBoundPortUntilAllBrokersPropagated(
+        ListenerName listenerName,
+        Duration waitTime,
+        KafkaClusterTestKit cluster
+    ) throws RuntimeException {
+        try {
+            long startTime = System.currentTimeMillis();
+            TestUtils.waitForCondition(() -> 
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
+                "Broker never made it to RUNNING state.");
+            TestUtils.waitForCondition(() -> 
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
+                "RaftManager was not initialized.");
+
+            Duration remainingWaitTime = 
waitTime.minus(Duration.ofMillis(System.currentTimeMillis() - startTime));
+
+            final DescribeClusterResponse[] currentResponse = new 
DescribeClusterResponse[1];
+            int expectedBrokerCount = cluster.nodes().brokerNodes().size();
+            TestUtils.waitForCondition(
+                () -> {
+                    currentResponse[0] = connectAndReceive(
+                        new DescribeClusterRequest.Builder(new 
DescribeClusterRequestData()).build(),
+                        
cluster.brokers().get(0).socketServer().boundPort(listenerName)
+                    );
+                    return currentResponse[0].nodes().size() == 
expectedBrokerCount;
+                },
+                remainingWaitTime.toMillis(),
+                String.format("After %s ms Broker is only aware of %s brokers, 
but %s are expected", remainingWaitTime.toMillis(), expectedBrokerCount, 
expectedBrokerCount)
+            );
+            return currentResponse[0];
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void waitForTopicListing(Admin admin, List<String> 
expectedPresent, List<String> expectedAbsent)
+        throws InterruptedException {
+        Set<String> topicsNotFound = new HashSet<>(expectedPresent);
+        Set<String> extraTopics = new HashSet<>();
+        TestUtils.waitForCondition(() -> {
+            Set<String> topicNames = admin.listTopics().names().get();
+            topicsNotFound.removeAll(topicNames);
+            extraTopics.clear();
+            
extraTopics.addAll(topicNames.stream().filter(expectedAbsent::contains).collect(Collectors.toSet()));
+            return topicsNotFound.isEmpty() && extraTopics.isEmpty();
+        }, String.format("Failed to find topic(s): %s and NOT find topic(s): 
%s", topicsNotFound, extraTopics));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testUnregisterBroker(boolean usingBootstrapControllers) throws 
Exception {
+        try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(
+            new TestKitNodes.Builder()
+                .setNumBrokerNodes(4)
+                .setNumControllerNodes(3)
+                .build()).build()) {
+            cluster.format();
+            cluster.startup();
+            cluster.waitForReadyBrokers();
+            TestUtils.waitForCondition(() -> 
brokerIsUnfenced(clusterImage(cluster, 1), 0),
+                "Timed out waiting for broker 0 to be unfenced.");
+            cluster.brokers().get(0).shutdown();
+            TestUtils.waitForCondition(() -> 
!brokerIsUnfenced(clusterImage(cluster, 1), 0),
+                "Timed out waiting for broker 0 to be fenced.");
+
+            try (Admin admin = createAdminClient(cluster, 
usingBootstrapControllers)) {
+                admin.unregisterBroker(0);
+            }
+
+            TestUtils.waitForCondition(() -> 
brokerIsAbsent(clusterImage(cluster, 1), 0),
+                "Timed out waiting for broker 0 to be fenced.");
+        }
+    }
+
+    private ClusterImage clusterImage(KafkaClusterTestKit cluster, int 
brokerId) {
+        return 
cluster.brokers().get(brokerId).metadataCache().currentImage().cluster();
+    }
+
+    private boolean brokerIsUnfenced(ClusterImage image, int brokerId) {
+        BrokerRegistration registration = image.brokers().get(brokerId);
+        if (registration == null) {
+            return false;
+        }
+        return !registration.fenced();
+    }
+
+    private boolean brokerIsAbsent(ClusterImage image, int brokerId) {
+        return !image.brokers().containsKey(brokerId);
+    }
+
+    private Admin createAdminClient(KafkaClusterTestKit cluster, boolean 
usingBootstrapControllers) {
+        Properties props = new Properties();
+        if (usingBootstrapControllers) {
+            props.setProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG, 
cluster.bootstrapControllers());
+            props.remove(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+        } else {
+            props = cluster.clientProperties();
+        }
+        props.put(AdminClientConfig.CLIENT_ID_CONFIG, 
this.getClass().getName());
+        return Admin.create(props);
+    }
+
     public static class BadAuthorizer implements Authorizer {
         // Default constructor needed for reflection object creation
         public BadAuthorizer() {
@@ -323,11 +715,11 @@ public class KRaftClusterTest {
         }
 
         @Override
-        public void updateQuota(ClientQuotaType quotaType, ClientQuotaEntity 
quotaEntity, double newValue) {
+        public void updateQuota(ClientQuotaType quotaType, 
org.apache.kafka.server.quota.ClientQuotaEntity quotaEntity, double newValue) {
         }
 
         @Override
-        public void removeQuota(ClientQuotaType quotaType, ClientQuotaEntity 
quotaEntity) {
+        public void removeQuota(ClientQuotaType quotaType, 
org.apache.kafka.server.quota.ClientQuotaEntity quotaEntity) {
         }
 
         @Override


Reply via email to