This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 434fe7c26e3 KAFKA-18081 Remove isKRaftTest from the kraft-only tests 
(#17934)
434fe7c26e3 is described below

commit 434fe7c26e31a6cfeb5e17e83fb52a260eac398e
Author: Yung <[email protected]>
AuthorDate: Wed Nov 27 18:37:43 2024 +0800

    KAFKA-18081 Remove isKRaftTest from the kraft-only tests (#17934)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/admin/RemoteTopicCrudTest.scala          |  12 +--
 ...merWithLegacyMessageFormatIntegrationTest.scala |  34 ++-----
 .../server/DynamicBrokerReconfigurationTest.scala  |  43 +--------
 .../scala/unit/kafka/admin/AddPartitionsTest.scala |  18 +---
 .../scala/unit/kafka/metrics/MetricsTest.scala     |   2 +-
 .../server/AbstractCreateTopicsRequestTest.scala   |   4 -
 .../kafka/server/AbstractMetadataRequestTest.scala |  14 +--
 .../kafka/server/ControllerMutationQuotaTest.scala |  15 +--
 .../server/CreateTopicsRequestWithPolicyTest.scala |  18 +---
 ...leteTopicsRequestWithDeletionDisabledTest.scala |   2 +-
 .../kafka/server/DescribeClusterRequestTest.scala  |  11 +--
 .../kafka/server/DynamicConfigChangeTest.scala     | 102 +++++++++------------
 .../kafka/server/KafkaMetricsReporterTest.scala    |   9 +-
 .../unit/kafka/server/ServerShutdownTest.scala     |  12 +--
 .../server/epoch/LeaderEpochIntegrationTest.scala  |   6 +-
 15 files changed, 83 insertions(+), 219 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala 
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index faef809b170..95c0b015ac5 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -504,14 +504,10 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
     val tsDisabledProps = TestUtils.createBrokerConfigs(1, 
zkConnectOrNull).head
     instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps))
 
-    if (isKRaftTest()) {
-      recreateBrokers(startup = true)
-      
assertTrue(faultHandler.firstException().getCause.isInstanceOf[ConfigException])
-      // Normally the exception is thrown as part of the TearDown method of 
the parent class(es). We would like to not do this.
-      faultHandler.setIgnore(true)
-    } else {
-      assertThrows(classOf[ConfigException], () => recreateBrokers(startup = 
true))
-    }
+    recreateBrokers(startup = true)
+    
assertTrue(faultHandler.firstException().getCause.isInstanceOf[ConfigException])
+    // Normally the exception is thrown as part of the TearDown method of the 
parent class(es). We would like to not do this.
+    faultHandler.setIgnore(true)
   }
 
   @ParameterizedTest
diff --git 
a/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala
index 5d9455d1b40..ab4cd82d6f6 100644
--- 
a/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala
@@ -20,7 +20,6 @@ import kafka.utils.TestInfoUtils
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.server.config.ReplicationConfigs
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, 
assertThrows}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.MethodSource
@@ -32,13 +31,6 @@ import scala.jdk.CollectionConverters._
 
 class ConsumerWithLegacyMessageFormatIntegrationTest extends 
AbstractConsumerTest {
 
-  override protected def brokerPropertyOverrides(properties: Properties): Unit 
= {
-    // legacy message formats are only supported with IBP < 3.0
-    // KRaft mode is not supported for inter.broker.protocol.version = 2.8, 
The minimum version required is 3.0-IV1"
-    if (!isKRaftTest())
-      properties.put(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, 
"2.8")
-  }
-
   @nowarn("cat=deprecation")
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
@@ -91,22 +83,16 @@ class ConsumerWithLegacyMessageFormatIntegrationTest 
extends AbstractConsumerTes
     assertEquals(20, timestampTopic1P1.timestamp)
     assertEquals(Optional.of(0), timestampTopic1P1.leaderEpoch)
 
-    if (!isKRaftTest()) {
-      assertNull(timestampOffsets.get(new TopicPartition(topic2, 0)), "null 
should be returned when message format is 0.9.0")
-      assertNull(timestampOffsets.get(new TopicPartition(topic2, 1)), "null 
should be returned when message format is 0.9.0")
-    }
-    else {
-      // legacy message formats are supported for IBP version < 3.0 and KRaft 
runs on minimum version 3.0-IV1
-      val timestampTopic2P0 = timestampOffsets.get(new TopicPartition(topic2, 
0))
-      assertEquals(40, timestampTopic2P0.offset)
-      assertEquals(40, timestampTopic2P0.timestamp)
-      assertEquals(Optional.of(0), timestampTopic2P0.leaderEpoch)
-
-      val timestampTopic2P1 = timestampOffsets.get(new TopicPartition(topic2, 
1))
-      assertEquals(60, timestampTopic2P1.offset)
-      assertEquals(60, timestampTopic2P1.timestamp)
-      assertEquals(Optional.of(0), timestampTopic2P1.leaderEpoch)
-    }
+    // legacy message formats are supported for IBP version < 3.0 and KRaft 
runs on minimum version 3.0-IV1
+    val timestampTopic2P0 = timestampOffsets.get(new TopicPartition(topic2, 0))
+    assertEquals(40, timestampTopic2P0.offset)
+    assertEquals(40, timestampTopic2P0.timestamp)
+    assertEquals(Optional.of(0), timestampTopic2P0.leaderEpoch)
+
+    val timestampTopic2P1 = timestampOffsets.get(new TopicPartition(topic2, 1))
+    assertEquals(60, timestampTopic2P1.offset)
+    assertEquals(60, timestampTopic2P1.timestamp)
+    assertEquals(Optional.of(0), timestampTopic2P1.leaderEpoch)
 
     val timestampTopic3P0 = timestampOffsets.get(new TopicPartition(topic3, 0))
     assertEquals(80, timestampTopic3P0.offset)
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index f19af34500d..b0c86ee4c67 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -30,13 +30,11 @@ import javax.management.ObjectName
 import com.yammer.metrics.core.MetricName
 import kafka.admin.ConfigCommand
 import kafka.api.{KafkaSasl, SaslSetup}
-import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
 import kafka.log.UnifiedLog
 import kafka.network.{DataPlaneAcceptor, Processor, RequestChannel}
 import kafka.security.JaasTestUtils
 import kafka.utils._
 import kafka.utils.Implicits._
-import kafka.utils.TestUtils.TestControllerRequestCompletionHandler
 import kafka.zk.ConfigEntityChangeNotificationZNode
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -52,12 +50,10 @@ import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.provider.FileConfigProvider
 import org.apache.kafka.common.errors.{AuthenticationException, 
InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.message.MetadataRequestData
 import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetric, 
MetricsContext, MetricsReporter, Quota}
 import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
 import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS, 
TRUSTSTORE_PROPS}
 import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.requests.MetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
@@ -69,7 +65,7 @@ import org.apache.kafka.server.metrics.{KafkaYammerMetrics, 
MetricConfigs}
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.apache.kafka.server.util.ShutdownableThread
 import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
-import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
+import org.apache.kafka.test.TestSslUtils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
@@ -333,14 +329,6 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
       assertFalse(reporter.kafkaMetrics.isEmpty, "No metrics found")
     }
 
-    if (!isKRaftTest()) {
-      // fetch from ZK, values should be unresolved
-      val props = fetchBrokerConfigsFromZooKeeper(servers.head)
-      assertTrue(props.getProperty(TestMetricsReporter.PollingIntervalProp) == 
PollingIntervalVal, "polling interval is not updated in ZK")
-      assertTrue(props.getProperty(configPrefix + SSL_TRUSTSTORE_TYPE_CONFIG) 
== SslTruststoreTypeVal, "store type is not updated in ZK")
-      assertTrue(props.getProperty(configPrefix + 
SSL_KEYSTORE_PASSWORD_CONFIG) == SslKeystorePasswordVal, "keystore password is 
not updated in ZK")
-    }
-
     // verify the update
     // 1. verify update not occurring if the value of property is same.
     alterConfigsUsingConfigCommand(updatedProps)
@@ -459,23 +447,6 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
       verifyProduceConsume(producer, consumer, 10, topic)
     }
 
-    def verifyBrokerToControllerCall(controller: KafkaServer): Unit = {
-      val nonControllerBroker = servers.find(_.config.brokerId != 
controller.config.brokerId).get
-      val brokerToControllerManager = 
nonControllerBroker.clientToControllerChannelManager
-      val completionHandler = new TestControllerRequestCompletionHandler()
-      brokerToControllerManager.sendRequest(new MetadataRequest.Builder(new 
MetadataRequestData()), completionHandler)
-      TestUtils.waitUntilTrue(() => {
-        completionHandler.completed.get() || completionHandler.timedOut.get()
-      }, "Timed out while waiting for broker to controller API call")
-      // we do not expect a timeout from broker to controller request
-      assertFalse(completionHandler.timedOut.get(), "broker to controller 
request is timeout")
-      assertTrue(completionHandler.actualResponse.isDefined, "No response 
recorded even though request is completed")
-      val response = completionHandler.actualResponse.get
-      assertNull(response.authenticationException(), s"Request failed due to 
authentication error ${response.authenticationException}")
-      assertNull(response.versionMismatch(), s"Request failed due to 
unsupported version error ${response.versionMismatch}")
-      assertFalse(response.wasDisconnected(), "Request failed because broker 
is not available")
-    }
-
     val group_id = new AtomicInteger(1)
     def next_group_name(): String = 
s"alter-truststore-${group_id.getAndIncrement()}"
 
@@ -518,18 +489,6 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2, 
perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
     verifySslProduceConsume(sslProperties2, next_group_name())
     waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
-
-    if (!isKRaftTest()) {
-      val controller = servers.find(_.config.brokerId == 
TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
-      val controllerChannelManager = 
controller.kafkaController.controllerChannelManager
-      val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
-        JTestUtils.fieldValue(controllerChannelManager, 
classOf[ControllerChannelManager], "brokerStateInfo")
-      brokerStateInfo(0).networkClient.disconnect("0")
-      TestUtils.createTopic(zkClient, "testtopic2", numPartitions, 
replicationFactor = numServers, servers)
-
-      // validate that the brokerToController request works fine
-      verifyBrokerToControllerCall(controller)
-    }
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 90f2b3e1979..e05ea8b18a0 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -26,7 +26,6 @@ import org.apache.kafka.clients.admin.{Admin, NewPartitions, 
NewTopic}
 import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
 import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
 import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
-import org.apache.kafka.server.common.AdminOperationException
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
@@ -116,21 +115,8 @@ class AddPartitionsTest extends BaseRequestTest {
       admin.createPartitions(Collections.singletonMap(topic1,
         NewPartitions.increaseTo(3, singletonList(asList(0, 1, 
2))))).all().get()).getCause
     assertEquals(classOf[InvalidReplicaAssignmentException], cause.getClass)
-    if (isKRaftTest()) {
-      assertTrue(cause.getMessage.contains("Attempted to add 2 additional 
partition(s), but only 1 assignment(s) " +
-        "were specified."), "Unexpected error message: " + cause.getMessage)
-    } else {
-      assertTrue(cause.getMessage.contains("Increasing the number of 
partitions by 2 but 1 assignments provided."),
-        "Unexpected error message: " + cause.getMessage)
-    }
-    if (!isKRaftTest()) {
-      // In ZK mode, test the raw AdminZkClient method as well.
-      val e = assertThrows(classOf[AdminOperationException], () => 
adminZkClient.addPartitions(
-        topic5, topic5Assignment, adminZkClient.getBrokerMetadatas(), 2,
-        Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2)))))
-      assertTrue(e.getMessage.contains("Unexpected existing replica assignment 
for topic 'new-topic5', partition " +
-        "id 0 is missing"))
-    }
+    assertTrue(cause.getMessage.contains("Attempted to add 2 additional 
partition(s), but only 1 assignment(s) " +
+      "were specified."), "Unexpected error message: " + cause.getMessage)
   }
 
   @ParameterizedTest
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala 
b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index acc2cf854a1..4bb0eccbb18 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -248,7 +248,7 @@ class MetricsTest extends KafkaServerTestHarness with 
Logging {
   @ValueSource(strings = Array("kraft"))
   def testSessionExpireListenerMetrics(quorum: String): Unit = {
     val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
-    val expectedNumMetrics = if (isKRaftTest()) 0 else 1
+    val expectedNumMetrics = 0
     assertEquals(expectedNumMetrics, metrics.keySet.asScala.
       count(_.getMBeanName == 
"kafka.server:type=SessionExpireListener,name=SessionState"))
     assertEquals(expectedNumMetrics, metrics.keySet.asScala.
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 6901933bff0..4a119cdd5e6 100644
--- 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -137,10 +137,6 @@ abstract class AbstractCreateTopicsRequestTest extends 
BaseRequestTest {
         }
       }
 
-      if (!isKRaftTest()) {
-        // Verify controller broker has the correct metadata
-        verifyMetadata(controllerSocketServer)
-      }
       if (!request.data.validateOnly) {
         // Wait until metadata is propagated and validate non-controller 
broker has the correct metadata
         TestUtils.waitForPartitionMetadata(brokers, topic.name(), 0)
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
index ebe4bd05f3f..b0f0f74e88d 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
@@ -54,17 +54,9 @@ abstract class AbstractMetadataRequestTest extends 
BaseRequestTest {
   }
 
   protected def checkAutoCreatedTopic(autoCreatedTopic: String, response: 
MetadataResponse): Unit = {
-    if (isKRaftTest()) {
-      assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
response.errors.get(autoCreatedTopic))
-      for (i <- 0 until brokers.head.config.numPartitions) {
-        TestUtils.waitForPartitionMetadata(brokers, autoCreatedTopic, i)
-      }
-    } else {
-      assertEquals(Errors.LEADER_NOT_AVAILABLE, 
response.errors.get(autoCreatedTopic))
-      assertEquals(Some(brokers.head.config.numPartitions), 
zkClient.getTopicPartitionCount(autoCreatedTopic))
-      for (i <- 0 until brokers.head.config.numPartitions) {
-        TestUtils.waitForPartitionMetadata(brokers, autoCreatedTopic, i)
-      }
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
response.errors.get(autoCreatedTopic))
+    for (i <- 0 until brokers.head.config.numPartitions) {
+      TestUtils.waitForPartitionMetadata(brokers, autoCreatedTopic, i)
     }
   }
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
index de2e80aace0..f63434a2561 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
@@ -387,24 +387,17 @@ class ControllerMutationQuotaTest extends BaseRequestTest 
{
 
   private def waitUserQuota(user: String, expectedQuota: Double): Unit = {
     val quotaManager = brokers.head.quotaManagers.controllerMutation
-    val controllerQuotaManager =
-      if (isKRaftTest()) 
Option(controllerServers.head.quotaManagers.controllerMutation)
-      else Option.empty
+    val controllerQuotaManager = 
controllerServers.head.quotaManagers.controllerMutation
     var actualQuota = Double.MinValue
 
     TestUtils.waitUntilTrue(() => {
       actualQuota = quotaManager.quota(user, "").bound()
-      if (controllerQuotaManager.isDefined)
-        expectedQuota == actualQuota && expectedQuota == 
controllerQuotaManager.get.quota(user, "").bound()
-      else
-        expectedQuota == actualQuota
+      expectedQuota == actualQuota && expectedQuota == 
controllerQuotaManager.quota(user, "").bound()
     }, s"Quota of $user is not $expectedQuota but $actualQuota")
   }
 
   private def quotaMetric(user: String): Option[KafkaMetric] = {
-    val metrics =
-      if (isKRaftTest()) controllerServers.head.metrics
-      else brokers.head.metrics
+    val metrics = controllerServers.head.metrics
     val metricName = metrics.metricName(
       "tokens",
       QuotaType.CONTROLLER_MUTATION.toString,
@@ -449,7 +442,7 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
     connectAndReceive[AlterClientQuotasResponse](
       request,
       destination = controllerSocketServer,
-      if (isKRaftTest()) ListenerName.normalised("CONTROLLER") else 
listenerName
+      ListenerName.normalised("CONTROLLER")
     )
   }
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala 
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
index 09d69b9bfce..96ebfd66683 100644
--- 
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
@@ -104,31 +104,19 @@ class CreateTopicsRequestWithPolicyTest extends 
AbstractCreateTopicsRequestTest
       Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS,
         Some("Topic 'existing-topic' already exists."))))
 
-    var errorMsg = if (isKRaftTest()) {
-      "Unable to replicate the partition 4 time(s): The target replication 
factor of 4 cannot be reached because only 3 broker(s) are registered."
-    } else {
-      "Replication factor: 4 larger than available brokers: 3."
-    }
+    var errorMsg = "Unable to replicate the partition 4 time(s): The target 
replication factor of 4 cannot be reached because only 3 broker(s) are 
registered."
     
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
       numPartitions = 10, replicationFactor = brokerCount + 1)), validateOnly 
= true),
       Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR,
         Some(errorMsg))))
 
-    errorMsg = if (isKRaftTest()) {
-      "Replication factor must be larger than 0, or -1 to use the default 
value."
-    } else {
-      "Replication factor must be larger than 0."
-    }
+    errorMsg = "Replication factor must be larger than 0, or -1 to use the 
default value."
     
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication2",
       numPartitions = 10, replicationFactor = -2)), validateOnly = true),
       Map("error-replication2" -> error(Errors.INVALID_REPLICATION_FACTOR,
         Some(errorMsg))))
 
-    errorMsg = if (isKRaftTest()) {
-      "Number of partitions was set to an invalid non-positive value."
-    } else {
-      "Number of partitions must be larger than 0."
-    }
+    errorMsg = "Number of partitions was set to an invalid non-positive value."
     
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions",
       numPartitions = -2, replicationFactor = 1)), validateOnly = true),
       Map("error-partitions" -> error(Errors.INVALID_PARTITIONS,
diff --git 
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
 
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
index bb0a96e1c34..ec7cdffa1ef 100644
--- 
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
@@ -71,7 +71,7 @@ class DeleteTopicsRequestWithDeletionDisabledTest extends 
BaseRequestTest {
     connectAndReceive[DeleteTopicsResponse](
       request,
       controllerSocketServer,
-      if (isKRaftTest()) ListenerName.normalised("CONTROLLER") else 
listenerName
+      ListenerName.normalised("CONTROLLER")
     )
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
index f053d22df12..6e43f904c11 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
@@ -69,11 +69,6 @@ class DescribeClusterRequestTest extends BaseRequestTest {
         .setRack(server.config.rack.orNull)
     }.toSet
 
-    var expectedControllerId = 0
-    if (!isKRaftTest()) {
-      // in KRaft mode DescribeClusterRequest will return a random broker id 
as the controllerId (KIP-590)
-      expectedControllerId = 
servers.filter(_.kafkaController.isActive).last.config.brokerId
-    }
     val expectedClusterId = brokers.last.clusterId
 
     val expectedClusterAuthorizedOperations = if 
(includeClusterAuthorizedOperations) {
@@ -92,11 +87,7 @@ class DescribeClusterRequestTest extends BaseRequestTest {
         .build(version.toShort)
       val describeClusterResponse = 
sentDescribeClusterRequest(describeClusterRequest)
 
-      if (isKRaftTest()) {
-        assertTrue(0 to brokerCount contains 
describeClusterResponse.data.controllerId)
-      } else {
-        assertEquals(expectedControllerId, 
describeClusterResponse.data.controllerId)
-      }
+      assertTrue(0 to brokerCount contains 
describeClusterResponse.data.controllerId)
       assertEquals(expectedClusterId, describeClusterResponse.data.clusterId)
       assertEquals(expectedClusterAuthorizedOperations, 
describeClusterResponse.data.clusterAuthorizedOperations)
       assertEquals(expectedBrokers, 
describeClusterResponse.data.brokers.asScala.toSet)
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index eb108b2dd9e..e867f606235 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.coordinator.group.GroupConfig
-import org.apache.kafka.server.config.{ConfigType, QuotaConfig, 
ServerLogConfigs}
+import org.apache.kafka.server.config.{QuotaConfig, ServerLogConfigs}
 import org.apache.kafka.storage.internals.log.LogConfig
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{Test, Timeout}
@@ -63,10 +63,6 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness 
{
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
   def testConfigChange(quorum: String): Unit = {
-    if (!isKRaftTest()) {
-      
assertTrue(this.servers.head.dynamicConfigHandlers.contains(ConfigType.TOPIC),
-        "Should contain a ConfigHandler for topics")
-    }
     val oldVal: java.lang.Long = 100000L
     val newVal: java.lang.Long = 200000L
     val tp = new TopicPartition("test", 0)
@@ -78,22 +74,20 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
       assertTrue(logOpt.isDefined)
       assertEquals(oldVal, logOpt.get.config.flushInterval)
     }
-    if (isKRaftTest()) {
-      val admin = createAdminClient()
-      try {
-        val resource = new ConfigResource(ConfigResource.Type.TOPIC, 
tp.topic())
-        val op = new AlterConfigOp(new 
ConfigEntry(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, newVal.toString),
-          OpType.SET)
-        val resource2 = new ConfigResource(ConfigResource.Type.BROKER, "")
-        val op2 = new AlterConfigOp(new 
ConfigEntry(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG, newVal.toString),
-          OpType.SET)
-        admin.incrementalAlterConfigs(Map(
-          resource -> List(op).asJavaCollection,
-          resource2 -> List(op2).asJavaCollection,
-        ).asJava).all.get
-      } finally {
-        admin.close()
-      }
+    val admin = createAdminClient()
+    try {
+      val resource = new ConfigResource(ConfigResource.Type.TOPIC, tp.topic())
+      val op = new AlterConfigOp(new 
ConfigEntry(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, newVal.toString),
+        OpType.SET)
+      val resource2 = new ConfigResource(ConfigResource.Type.BROKER, "")
+      val op2 = new AlterConfigOp(new 
ConfigEntry(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG, newVal.toString),
+        OpType.SET)
+      admin.incrementalAlterConfigs(Map(
+        resource -> List(op).asJavaCollection,
+        resource2 -> List(op2).asJavaCollection,
+      ).asJava).all.get
+    } finally {
+      admin.close()
     }
     TestUtils.retry(10000) {
       assertEquals(newVal, 
this.brokers.head.logManager.getLog(tp).get.config.flushInterval)
@@ -115,16 +109,14 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     }
 
     val newSegmentSize = 2000
-    if (isKRaftTest()) {
-      val admin = createAdminClient()
-      try {
-        val resource = new ConfigResource(ConfigResource.Type.TOPIC, 
tp.topic())
-        val op = new AlterConfigOp(new 
ConfigEntry(TopicConfig.SEGMENT_BYTES_CONFIG, newSegmentSize.toString),
-          OpType.SET)
-        admin.incrementalAlterConfigs(Map(resource -> 
List(op).asJavaCollection).asJava).all.get
-      } finally {
-        admin.close()
-      }
+    val admin = createAdminClient()
+    try {
+      val resource = new ConfigResource(ConfigResource.Type.TOPIC, tp.topic())
+      val op = new AlterConfigOp(new 
ConfigEntry(TopicConfig.SEGMENT_BYTES_CONFIG, newSegmentSize.toString),
+        OpType.SET)
+      admin.incrementalAlterConfigs(Map(resource -> 
List(op).asJavaCollection).asJava).all.get
+    } finally {
+      admin.close()
     }
     val log = brokers.head.logManager.getLog(tp).get
     TestUtils.retry(10000) {
@@ -241,18 +233,16 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
   @ValueSource(strings = Array("kraft"))
   def testIpQuotaInitialization(quorum: String): Unit = {
     val broker = brokers.head
-    if (isKRaftTest()) {
-      val admin = createAdminClient()
-      try {
-        val alterations = util.Arrays.asList(
-          new ClientQuotaAlteration(new ClientQuotaEntity(singletonMap(IP, 
null)),
-            singletonList(new 
Op(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, 20))),
-          new ClientQuotaAlteration(new ClientQuotaEntity(singletonMap(IP, 
"1.2.3.4")),
-            singletonList(new 
Op(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, 10))))
-        admin.alterClientQuotas(alterations).all().get()
-      } finally {
-        admin.close()
-      }
+    val admin = createAdminClient()
+    try {
+      val alterations = util.Arrays.asList(
+        new ClientQuotaAlteration(new ClientQuotaEntity(singletonMap(IP, 
null)),
+          singletonList(new Op(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, 
20))),
+        new ClientQuotaAlteration(new ClientQuotaEntity(singletonMap(IP, 
"1.2.3.4")),
+          singletonList(new Op(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, 
10))))
+      admin.alterClientQuotas(alterations).all().get()
+    } finally {
+      admin.close()
     }
     TestUtils.retry(10000) {
       val connectionQuotas = broker.socketServer.connectionQuotas
@@ -355,20 +345,18 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
   private def setBrokerConfigs(brokerId: String, newValue: Long): Unit = 
alterBrokerConfigs(brokerId, newValue, OpType.SET)
   private def deleteBrokerConfigs(brokerId: String): Unit = 
alterBrokerConfigs(brokerId, 0, OpType.DELETE)
   private def alterBrokerConfigs(brokerId: String, newValue: Long, op: 
OpType): Unit = {
-    if (isKRaftTest()) {
-      val admin = createAdminClient()
-      try {
-        val resource = new ConfigResource(ConfigResource.Type.BROKER, brokerId)
-        val configOp = new AlterConfigOp(new 
ConfigEntry(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, 
newValue.toString), op)
-        val configOp2 = new AlterConfigOp(new 
ConfigEntry(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, 
newValue.toString), op)
-        val configOp3 = new AlterConfigOp(new 
ConfigEntry(QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, 
newValue.toString), op)
-        val configOps = List(configOp, configOp2, configOp3).asJavaCollection
-        admin.incrementalAlterConfigs(Map(
-          resource -> configOps,
-        ).asJava).all.get
-      } finally {
-        admin.close()
-      }
+    val admin = createAdminClient()
+    try {
+      val resource = new ConfigResource(ConfigResource.Type.BROKER, brokerId)
+      val configOp = new AlterConfigOp(new 
ConfigEntry(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG, 
newValue.toString), op)
+      val configOp2 = new AlterConfigOp(new 
ConfigEntry(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG, 
newValue.toString), op)
+      val configOp3 = new AlterConfigOp(new 
ConfigEntry(QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG, 
newValue.toString), op)
+      val configOps = List(configOp, configOp2, configOp3).asJavaCollection
+      admin.incrementalAlterConfigs(Map(
+        resource -> configOps,
+      ).asJava).all.get
+    } finally {
+      admin.close()
     }
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
index 68820f84762..a97cea67c5c 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -84,13 +84,8 @@ class KafkaMetricsReporterTest extends QuorumTestHarness {
   @ValueSource(strings = Array("kraft"))
   def testMetricsContextNamespacePresent(quorum: String): Unit = {
     assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID.get())
-    if (isKRaftTest()) {
-      assertNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get())
-      assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
-    } else {
-      
assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get())
-      assertNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
-    }
+    assertNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get())
+    assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
     assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get())
 
     broker.shutdown()
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 1a26139cb32..3a1043198fa 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -137,12 +137,10 @@ class ServerShutdownTest extends KafkaServerTestHarness {
   @ParameterizedTest
   @ValueSource(strings = Array("kraft"))
   def testCleanShutdownAfterFailedStartup(quorum: String): Unit = {
-    if (isKRaftTest()) {
-      
propsToChangeUponRestart.setProperty(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG,
 "1000")
-      shutdownBroker()
-      shutdownKRaftController()
-      verifyCleanShutdownAfterFailedStartup[CancellationException]
-    }
+    
propsToChangeUponRestart.setProperty(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG,
 "1000")
+    shutdownBroker()
+    shutdownKRaftController()
+    verifyCleanShutdownAfterFailedStartup[CancellationException]
   }
 
   @ParameterizedTest
@@ -195,7 +193,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
       // goes wrong so that awaitShutdown doesn't hang
       case e: Exception =>
         assertCause(exceptionClassTag.runtimeClass, e)
-        assertEquals(if (isKRaftTest()) BrokerState.SHUTTING_DOWN else 
BrokerState.NOT_RUNNING, brokers.head.brokerState)
+        assertEquals(BrokerState.SHUTTING_DOWN, brokers.head.brokerState)
     } finally {
       shutdownBroker()
     }
diff --git 
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index dd3176b9cb8..b943dd5bff6 100644
--- 
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -151,11 +151,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness 
with Logging {
   def shouldIncreaseLeaderEpochBetweenLeaderRestarts(quorum: String): Unit = {
     //Setup: we are only interested in the single partition on broker 101
     brokers += createBroker(fromProps(createBrokerConfig(100, 
zkConnectOrNull)))
-    if (isKRaftTest()) {
-      assertEquals(controllerServer.config.nodeId, 
waitUntilQuorumLeaderElected(controllerServer))
-    } else {
-      assertEquals(100, TestUtils.waitUntilControllerElected(zkClient))
-    }
+    assertEquals(controllerServer.config.nodeId, 
waitUntilQuorumLeaderElected(controllerServer))
 
     brokers += createBroker(fromProps(createBrokerConfig(101, 
zkConnectOrNull)))
 


Reply via email to