This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 434fe7c26e3 KAFKA-18081 Remove isKRaftTest from the kraft-only tests
(#17934)
434fe7c26e3 is described below
commit 434fe7c26e31a6cfeb5e17e83fb52a260eac398e
Author: Yung <[email protected]>
AuthorDate: Wed Nov 27 18:37:43 2024 +0800
KAFKA-18081 Remove isKRaftTest from the kraft-only tests (#17934)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/admin/RemoteTopicCrudTest.scala | 12 +--
...merWithLegacyMessageFormatIntegrationTest.scala | 34 ++-----
.../server/DynamicBrokerReconfigurationTest.scala | 43 +--------
.../scala/unit/kafka/admin/AddPartitionsTest.scala | 18 +---
.../scala/unit/kafka/metrics/MetricsTest.scala | 2 +-
.../server/AbstractCreateTopicsRequestTest.scala | 4 -
.../kafka/server/AbstractMetadataRequestTest.scala | 14 +--
.../kafka/server/ControllerMutationQuotaTest.scala | 15 +--
.../server/CreateTopicsRequestWithPolicyTest.scala | 18 +---
...leteTopicsRequestWithDeletionDisabledTest.scala | 2 +-
.../kafka/server/DescribeClusterRequestTest.scala | 11 +--
.../kafka/server/DynamicConfigChangeTest.scala | 102 +++++++++------------
.../kafka/server/KafkaMetricsReporterTest.scala | 9 +-
.../unit/kafka/server/ServerShutdownTest.scala | 12 +--
.../server/epoch/LeaderEpochIntegrationTest.scala | 6 +-
15 files changed, 83 insertions(+), 219 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
index faef809b170..95c0b015ac5 100644
--- a/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/RemoteTopicCrudTest.scala
@@ -504,14 +504,10 @@ class RemoteTopicCrudTest extends IntegrationTestHarness {
val tsDisabledProps = TestUtils.createBrokerConfigs(1,
zkConnectOrNull).head
instanceConfigs = List(KafkaConfig.fromProps(tsDisabledProps))
- if (isKRaftTest()) {
- recreateBrokers(startup = true)
-
assertTrue(faultHandler.firstException().getCause.isInstanceOf[ConfigException])
- // Normally the exception is thrown as part of the TearDown method of
the parent class(es). We would like to not do this.
- faultHandler.setIgnore(true)
- } else {
- assertThrows(classOf[ConfigException], () => recreateBrokers(startup =
true))
- }
+ recreateBrokers(startup = true)
+
assertTrue(faultHandler.firstException().getCause.isInstanceOf[ConfigException])
+ // Normally the exception is thrown as part of the TearDown method of the
parent class(es). We would like to not do this.
+ faultHandler.setIgnore(true)
}
@ParameterizedTest
diff --git
a/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala
index 5d9455d1b40..ab4cd82d6f6 100644
---
a/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/ConsumerWithLegacyMessageFormatIntegrationTest.scala
@@ -20,7 +20,6 @@ import kafka.utils.TestInfoUtils
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.server.config.ReplicationConfigs
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull,
assertThrows}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
@@ -32,13 +31,6 @@ import scala.jdk.CollectionConverters._
class ConsumerWithLegacyMessageFormatIntegrationTest extends
AbstractConsumerTest {
- override protected def brokerPropertyOverrides(properties: Properties): Unit
= {
- // legacy message formats are only supported with IBP < 3.0
- // KRaft mode is not supported for inter.broker.protocol.version = 2.8,
The minimum version required is 3.0-IV1"
- if (!isKRaftTest())
- properties.put(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG,
"2.8")
- }
-
@nowarn("cat=deprecation")
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
@@ -91,22 +83,16 @@ class ConsumerWithLegacyMessageFormatIntegrationTest
extends AbstractConsumerTes
assertEquals(20, timestampTopic1P1.timestamp)
assertEquals(Optional.of(0), timestampTopic1P1.leaderEpoch)
- if (!isKRaftTest()) {
- assertNull(timestampOffsets.get(new TopicPartition(topic2, 0)), "null
should be returned when message format is 0.9.0")
- assertNull(timestampOffsets.get(new TopicPartition(topic2, 1)), "null
should be returned when message format is 0.9.0")
- }
- else {
- // legacy message formats are supported for IBP version < 3.0 and KRaft
runs on minimum version 3.0-IV1
- val timestampTopic2P0 = timestampOffsets.get(new TopicPartition(topic2,
0))
- assertEquals(40, timestampTopic2P0.offset)
- assertEquals(40, timestampTopic2P0.timestamp)
- assertEquals(Optional.of(0), timestampTopic2P0.leaderEpoch)
-
- val timestampTopic2P1 = timestampOffsets.get(new TopicPartition(topic2,
1))
- assertEquals(60, timestampTopic2P1.offset)
- assertEquals(60, timestampTopic2P1.timestamp)
- assertEquals(Optional.of(0), timestampTopic2P1.leaderEpoch)
- }
+ // legacy message formats are supported for IBP version < 3.0 and KRaft
runs on minimum version 3.0-IV1
+ val timestampTopic2P0 = timestampOffsets.get(new TopicPartition(topic2, 0))
+ assertEquals(40, timestampTopic2P0.offset)
+ assertEquals(40, timestampTopic2P0.timestamp)
+ assertEquals(Optional.of(0), timestampTopic2P0.leaderEpoch)
+
+ val timestampTopic2P1 = timestampOffsets.get(new TopicPartition(topic2, 1))
+ assertEquals(60, timestampTopic2P1.offset)
+ assertEquals(60, timestampTopic2P1.timestamp)
+ assertEquals(Optional.of(0), timestampTopic2P1.leaderEpoch)
val timestampTopic3P0 = timestampOffsets.get(new TopicPartition(topic3, 0))
assertEquals(80, timestampTopic3P0.offset)
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index f19af34500d..b0c86ee4c67 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -30,13 +30,11 @@ import javax.management.ObjectName
import com.yammer.metrics.core.MetricName
import kafka.admin.ConfigCommand
import kafka.api.{KafkaSasl, SaslSetup}
-import kafka.controller.{ControllerBrokerStateInfo, ControllerChannelManager}
import kafka.log.UnifiedLog
import kafka.network.{DataPlaneAcceptor, Processor, RequestChannel}
import kafka.security.JaasTestUtils
import kafka.utils._
import kafka.utils.Implicits._
-import kafka.utils.TestUtils.TestControllerRequestCompletionHandler
import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
@@ -52,12 +50,10 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.provider.FileConfigProvider
import org.apache.kafka.common.errors.{AuthenticationException,
InvalidRequestException}
import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.message.MetadataRequestData
import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetric,
MetricsContext, MetricsReporter, Quota}
import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.common.network.CertStores.{KEYSTORE_PROPS,
TRUSTSTORE_PROPS}
import org.apache.kafka.common.record.TimestampType
-import org.apache.kafka.common.requests.MetadataRequest
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.serialization.{StringDeserializer,
StringSerializer}
@@ -69,7 +65,7 @@ import org.apache.kafka.server.metrics.{KafkaYammerMetrics,
MetricConfigs}
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
-import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
+import org.apache.kafka.test.TestSslUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
@@ -333,14 +329,6 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
assertFalse(reporter.kafkaMetrics.isEmpty, "No metrics found")
}
- if (!isKRaftTest()) {
- // fetch from ZK, values should be unresolved
- val props = fetchBrokerConfigsFromZooKeeper(servers.head)
- assertTrue(props.getProperty(TestMetricsReporter.PollingIntervalProp) ==
PollingIntervalVal, "polling interval is not updated in ZK")
- assertTrue(props.getProperty(configPrefix + SSL_TRUSTSTORE_TYPE_CONFIG)
== SslTruststoreTypeVal, "store type is not updated in ZK")
- assertTrue(props.getProperty(configPrefix +
SSL_KEYSTORE_PASSWORD_CONFIG) == SslKeystorePasswordVal, "keystore password is
not updated in ZK")
- }
-
// verify the update
// 1. verify update not occurring if the value of property is same.
alterConfigsUsingConfigCommand(updatedProps)
@@ -459,23 +447,6 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
verifyProduceConsume(producer, consumer, 10, topic)
}
- def verifyBrokerToControllerCall(controller: KafkaServer): Unit = {
- val nonControllerBroker = servers.find(_.config.brokerId !=
controller.config.brokerId).get
- val brokerToControllerManager =
nonControllerBroker.clientToControllerChannelManager
- val completionHandler = new TestControllerRequestCompletionHandler()
- brokerToControllerManager.sendRequest(new MetadataRequest.Builder(new
MetadataRequestData()), completionHandler)
- TestUtils.waitUntilTrue(() => {
- completionHandler.completed.get() || completionHandler.timedOut.get()
- }, "Timed out while waiting for broker to controller API call")
- // we do not expect a timeout from broker to controller request
- assertFalse(completionHandler.timedOut.get(), "broker to controller
request is timeout")
- assertTrue(completionHandler.actualResponse.isDefined, "No response
recorded even though request is completed")
- val response = completionHandler.actualResponse.get
- assertNull(response.authenticationException(), s"Request failed due to
authentication error ${response.authenticationException}")
- assertNull(response.versionMismatch(), s"Request failed due to
unsupported version error ${response.versionMismatch}")
- assertFalse(response.wasDisconnected(), "Request failed because broker
is not available")
- }
-
val group_id = new AtomicInteger(1)
def next_group_name(): String =
s"alter-truststore-${group_id.getAndIncrement()}"
@@ -518,18 +489,6 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
TestUtils.incrementalAlterConfigs(servers, adminClients.head, props2,
perBrokerConfig = true).all.get(15, TimeUnit.SECONDS)
verifySslProduceConsume(sslProperties2, next_group_name())
waitForAuthenticationFailure(producerBuilder.keyStoreProps(sslProperties1))
-
- if (!isKRaftTest()) {
- val controller = servers.find(_.config.brokerId ==
TestUtils.waitUntilControllerElected(zkClient)).get.asInstanceOf[KafkaServer]
- val controllerChannelManager =
controller.kafkaController.controllerChannelManager
- val brokerStateInfo: mutable.HashMap[Int, ControllerBrokerStateInfo] =
- JTestUtils.fieldValue(controllerChannelManager,
classOf[ControllerChannelManager], "brokerStateInfo")
- brokerStateInfo(0).networkClient.disconnect("0")
- TestUtils.createTopic(zkClient, "testtopic2", numPartitions,
replicationFactor = numServers, servers)
-
- // validate that the brokerToController request works fine
- verifyBrokerToControllerCall(controller)
- }
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 90f2b3e1979..e05ea8b18a0 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -26,7 +26,6 @@ import org.apache.kafka.clients.admin.{Admin, NewPartitions,
NewTopic}
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
-import org.apache.kafka.server.common.AdminOperationException
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
@@ -116,21 +115,8 @@ class AddPartitionsTest extends BaseRequestTest {
admin.createPartitions(Collections.singletonMap(topic1,
NewPartitions.increaseTo(3, singletonList(asList(0, 1,
2))))).all().get()).getCause
assertEquals(classOf[InvalidReplicaAssignmentException], cause.getClass)
- if (isKRaftTest()) {
- assertTrue(cause.getMessage.contains("Attempted to add 2 additional
partition(s), but only 1 assignment(s) " +
- "were specified."), "Unexpected error message: " + cause.getMessage)
- } else {
- assertTrue(cause.getMessage.contains("Increasing the number of
partitions by 2 but 1 assignments provided."),
- "Unexpected error message: " + cause.getMessage)
- }
- if (!isKRaftTest()) {
- // In ZK mode, test the raw AdminZkClient method as well.
- val e = assertThrows(classOf[AdminOperationException], () =>
adminZkClient.addPartitions(
- topic5, topic5Assignment, adminZkClient.getBrokerMetadatas(), 2,
- Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2)))))
- assertTrue(e.getMessage.contains("Unexpected existing replica assignment
for topic 'new-topic5', partition " +
- "id 0 is missing"))
- }
+ assertTrue(cause.getMessage.contains("Attempted to add 2 additional
partition(s), but only 1 assignment(s) " +
+ "were specified."), "Unexpected error message: " + cause.getMessage)
}
@ParameterizedTest
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index acc2cf854a1..4bb0eccbb18 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -248,7 +248,7 @@ class MetricsTest extends KafkaServerTestHarness with
Logging {
@ValueSource(strings = Array("kraft"))
def testSessionExpireListenerMetrics(quorum: String): Unit = {
val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
- val expectedNumMetrics = if (isKRaftTest()) 0 else 1
+ val expectedNumMetrics = 0
assertEquals(expectedNumMetrics, metrics.keySet.asScala.
count(_.getMBeanName ==
"kafka.server:type=SessionExpireListener,name=SessionState"))
assertEquals(expectedNumMetrics, metrics.keySet.asScala.
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 6901933bff0..4a119cdd5e6 100644
---
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -137,10 +137,6 @@ abstract class AbstractCreateTopicsRequestTest extends
BaseRequestTest {
}
}
- if (!isKRaftTest()) {
- // Verify controller broker has the correct metadata
- verifyMetadata(controllerSocketServer)
- }
if (!request.data.validateOnly) {
// Wait until metadata is propagated and validate non-controller
broker has the correct metadata
TestUtils.waitForPartitionMetadata(brokers, topic.name(), 0)
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
index ebe4bd05f3f..b0f0f74e88d 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractMetadataRequestTest.scala
@@ -54,17 +54,9 @@ abstract class AbstractMetadataRequestTest extends
BaseRequestTest {
}
protected def checkAutoCreatedTopic(autoCreatedTopic: String, response:
MetadataResponse): Unit = {
- if (isKRaftTest()) {
- assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response.errors.get(autoCreatedTopic))
- for (i <- 0 until brokers.head.config.numPartitions) {
- TestUtils.waitForPartitionMetadata(brokers, autoCreatedTopic, i)
- }
- } else {
- assertEquals(Errors.LEADER_NOT_AVAILABLE,
response.errors.get(autoCreatedTopic))
- assertEquals(Some(brokers.head.config.numPartitions),
zkClient.getTopicPartitionCount(autoCreatedTopic))
- for (i <- 0 until brokers.head.config.numPartitions) {
- TestUtils.waitForPartitionMetadata(brokers, autoCreatedTopic, i)
- }
+ assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION,
response.errors.get(autoCreatedTopic))
+ for (i <- 0 until brokers.head.config.numPartitions) {
+ TestUtils.waitForPartitionMetadata(brokers, autoCreatedTopic, i)
}
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
index de2e80aace0..f63434a2561 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerMutationQuotaTest.scala
@@ -387,24 +387,17 @@ class ControllerMutationQuotaTest extends BaseRequestTest
{
private def waitUserQuota(user: String, expectedQuota: Double): Unit = {
val quotaManager = brokers.head.quotaManagers.controllerMutation
- val controllerQuotaManager =
- if (isKRaftTest())
Option(controllerServers.head.quotaManagers.controllerMutation)
- else Option.empty
+ val controllerQuotaManager =
controllerServers.head.quotaManagers.controllerMutation
var actualQuota = Double.MinValue
TestUtils.waitUntilTrue(() => {
actualQuota = quotaManager.quota(user, "").bound()
- if (controllerQuotaManager.isDefined)
- expectedQuota == actualQuota && expectedQuota ==
controllerQuotaManager.get.quota(user, "").bound()
- else
- expectedQuota == actualQuota
+ expectedQuota == actualQuota && expectedQuota ==
controllerQuotaManager.quota(user, "").bound()
}, s"Quota of $user is not $expectedQuota but $actualQuota")
}
private def quotaMetric(user: String): Option[KafkaMetric] = {
- val metrics =
- if (isKRaftTest()) controllerServers.head.metrics
- else brokers.head.metrics
+ val metrics = controllerServers.head.metrics
val metricName = metrics.metricName(
"tokens",
QuotaType.CONTROLLER_MUTATION.toString,
@@ -449,7 +442,7 @@ class ControllerMutationQuotaTest extends BaseRequestTest {
connectAndReceive[AlterClientQuotasResponse](
request,
destination = controllerSocketServer,
- if (isKRaftTest()) ListenerName.normalised("CONTROLLER") else
listenerName
+ ListenerName.normalised("CONTROLLER")
)
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
index 09d69b9bfce..96ebfd66683 100644
---
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
+++
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithPolicyTest.scala
@@ -104,31 +104,19 @@ class CreateTopicsRequestWithPolicyTest extends
AbstractCreateTopicsRequestTest
Map(existingTopic -> error(Errors.TOPIC_ALREADY_EXISTS,
Some("Topic 'existing-topic' already exists."))))
- var errorMsg = if (isKRaftTest()) {
- "Unable to replicate the partition 4 time(s): The target replication
factor of 4 cannot be reached because only 3 broker(s) are registered."
- } else {
- "Replication factor: 4 larger than available brokers: 3."
- }
+ var errorMsg = "Unable to replicate the partition 4 time(s): The target
replication factor of 4 cannot be reached because only 3 broker(s) are
registered."
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication",
numPartitions = 10, replicationFactor = brokerCount + 1)), validateOnly
= true),
Map("error-replication" -> error(Errors.INVALID_REPLICATION_FACTOR,
Some(errorMsg))))
- errorMsg = if (isKRaftTest()) {
- "Replication factor must be larger than 0, or -1 to use the default
value."
- } else {
- "Replication factor must be larger than 0."
- }
+ errorMsg = "Replication factor must be larger than 0, or -1 to use the
default value."
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-replication2",
numPartitions = 10, replicationFactor = -2)), validateOnly = true),
Map("error-replication2" -> error(Errors.INVALID_REPLICATION_FACTOR,
Some(errorMsg))))
- errorMsg = if (isKRaftTest()) {
- "Number of partitions was set to an invalid non-positive value."
- } else {
- "Number of partitions must be larger than 0."
- }
+ errorMsg = "Number of partitions was set to an invalid non-positive value."
validateErrorCreateTopicsRequests(topicsReq(Seq(topicReq("error-partitions",
numPartitions = -2, replicationFactor = 1)), validateOnly = true),
Map("error-partitions" -> error(Errors.INVALID_PARTITIONS,
diff --git
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
index bb0a96e1c34..ec7cdffa1ef 100644
---
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
+++
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala
@@ -71,7 +71,7 @@ class DeleteTopicsRequestWithDeletionDisabledTest extends
BaseRequestTest {
connectAndReceive[DeleteTopicsResponse](
request,
controllerSocketServer,
- if (isKRaftTest()) ListenerName.normalised("CONTROLLER") else
listenerName
+ ListenerName.normalised("CONTROLLER")
)
}
diff --git
a/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
b/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
index f053d22df12..6e43f904c11 100644
--- a/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DescribeClusterRequestTest.scala
@@ -69,11 +69,6 @@ class DescribeClusterRequestTest extends BaseRequestTest {
.setRack(server.config.rack.orNull)
}.toSet
- var expectedControllerId = 0
- if (!isKRaftTest()) {
- // in KRaft mode DescribeClusterRequest will return a random broker id
as the controllerId (KIP-590)
- expectedControllerId =
servers.filter(_.kafkaController.isActive).last.config.brokerId
- }
val expectedClusterId = brokers.last.clusterId
val expectedClusterAuthorizedOperations = if
(includeClusterAuthorizedOperations) {
@@ -92,11 +87,7 @@ class DescribeClusterRequestTest extends BaseRequestTest {
.build(version.toShort)
val describeClusterResponse =
sentDescribeClusterRequest(describeClusterRequest)
- if (isKRaftTest()) {
- assertTrue(0 to brokerCount contains
describeClusterResponse.data.controllerId)
- } else {
- assertEquals(expectedControllerId,
describeClusterResponse.data.controllerId)
- }
+ assertTrue(0 to brokerCount contains
describeClusterResponse.data.controllerId)
assertEquals(expectedClusterId, describeClusterResponse.data.clusterId)
assertEquals(expectedClusterAuthorizedOperations,
describeClusterResponse.data.clusterAuthorizedOperations)
assertEquals(expectedBrokers,
describeClusterResponse.data.brokers.asScala.toSet)
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index eb108b2dd9e..e867f606235 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -34,7 +34,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.GroupConfig
-import org.apache.kafka.server.config.{ConfigType, QuotaConfig,
ServerLogConfigs}
+import org.apache.kafka.server.config.{QuotaConfig, ServerLogConfigs}
import org.apache.kafka.storage.internals.log.LogConfig
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{Test, Timeout}
@@ -63,10 +63,6 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness
{
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testConfigChange(quorum: String): Unit = {
- if (!isKRaftTest()) {
-
assertTrue(this.servers.head.dynamicConfigHandlers.contains(ConfigType.TOPIC),
- "Should contain a ConfigHandler for topics")
- }
val oldVal: java.lang.Long = 100000L
val newVal: java.lang.Long = 200000L
val tp = new TopicPartition("test", 0)
@@ -78,22 +74,20 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
assertTrue(logOpt.isDefined)
assertEquals(oldVal, logOpt.get.config.flushInterval)
}
- if (isKRaftTest()) {
- val admin = createAdminClient()
- try {
- val resource = new ConfigResource(ConfigResource.Type.TOPIC,
tp.topic())
- val op = new AlterConfigOp(new
ConfigEntry(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, newVal.toString),
- OpType.SET)
- val resource2 = new ConfigResource(ConfigResource.Type.BROKER, "")
- val op2 = new AlterConfigOp(new
ConfigEntry(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG, newVal.toString),
- OpType.SET)
- admin.incrementalAlterConfigs(Map(
- resource -> List(op).asJavaCollection,
- resource2 -> List(op2).asJavaCollection,
- ).asJava).all.get
- } finally {
- admin.close()
- }
+ val admin = createAdminClient()
+ try {
+ val resource = new ConfigResource(ConfigResource.Type.TOPIC, tp.topic())
+ val op = new AlterConfigOp(new
ConfigEntry(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, newVal.toString),
+ OpType.SET)
+ val resource2 = new ConfigResource(ConfigResource.Type.BROKER, "")
+ val op2 = new AlterConfigOp(new
ConfigEntry(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG, newVal.toString),
+ OpType.SET)
+ admin.incrementalAlterConfigs(Map(
+ resource -> List(op).asJavaCollection,
+ resource2 -> List(op2).asJavaCollection,
+ ).asJava).all.get
+ } finally {
+ admin.close()
}
TestUtils.retry(10000) {
assertEquals(newVal,
this.brokers.head.logManager.getLog(tp).get.config.flushInterval)
@@ -115,16 +109,14 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
}
val newSegmentSize = 2000
- if (isKRaftTest()) {
- val admin = createAdminClient()
- try {
- val resource = new ConfigResource(ConfigResource.Type.TOPIC,
tp.topic())
- val op = new AlterConfigOp(new
ConfigEntry(TopicConfig.SEGMENT_BYTES_CONFIG, newSegmentSize.toString),
- OpType.SET)
- admin.incrementalAlterConfigs(Map(resource ->
List(op).asJavaCollection).asJava).all.get
- } finally {
- admin.close()
- }
+ val admin = createAdminClient()
+ try {
+ val resource = new ConfigResource(ConfigResource.Type.TOPIC, tp.topic())
+ val op = new AlterConfigOp(new
ConfigEntry(TopicConfig.SEGMENT_BYTES_CONFIG, newSegmentSize.toString),
+ OpType.SET)
+ admin.incrementalAlterConfigs(Map(resource ->
List(op).asJavaCollection).asJava).all.get
+ } finally {
+ admin.close()
}
val log = brokers.head.logManager.getLog(tp).get
TestUtils.retry(10000) {
@@ -241,18 +233,16 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
@ValueSource(strings = Array("kraft"))
def testIpQuotaInitialization(quorum: String): Unit = {
val broker = brokers.head
- if (isKRaftTest()) {
- val admin = createAdminClient()
- try {
- val alterations = util.Arrays.asList(
- new ClientQuotaAlteration(new ClientQuotaEntity(singletonMap(IP,
null)),
- singletonList(new
Op(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, 20))),
- new ClientQuotaAlteration(new ClientQuotaEntity(singletonMap(IP,
"1.2.3.4")),
- singletonList(new
Op(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG, 10))))
- admin.alterClientQuotas(alterations).all().get()
- } finally {
- admin.close()
- }
+ val admin = createAdminClient()
+ try {
+ val alterations = util.Arrays.asList(
+ new ClientQuotaAlteration(new ClientQuotaEntity(singletonMap(IP,
null)),
+ singletonList(new Op(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG,
20))),
+ new ClientQuotaAlteration(new ClientQuotaEntity(singletonMap(IP,
"1.2.3.4")),
+ singletonList(new Op(QuotaConfig.IP_CONNECTION_RATE_OVERRIDE_CONFIG,
10))))
+ admin.alterClientQuotas(alterations).all().get()
+ } finally {
+ admin.close()
}
TestUtils.retry(10000) {
val connectionQuotas = broker.socketServer.connectionQuotas
@@ -355,20 +345,18 @@ class DynamicConfigChangeTest extends
KafkaServerTestHarness {
private def setBrokerConfigs(brokerId: String, newValue: Long): Unit =
alterBrokerConfigs(brokerId, newValue, OpType.SET)
private def deleteBrokerConfigs(brokerId: String): Unit =
alterBrokerConfigs(brokerId, 0, OpType.DELETE)
private def alterBrokerConfigs(brokerId: String, newValue: Long, op:
OpType): Unit = {
- if (isKRaftTest()) {
- val admin = createAdminClient()
- try {
- val resource = new ConfigResource(ConfigResource.Type.BROKER, brokerId)
- val configOp = new AlterConfigOp(new
ConfigEntry(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG,
newValue.toString), op)
- val configOp2 = new AlterConfigOp(new
ConfigEntry(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG,
newValue.toString), op)
- val configOp3 = new AlterConfigOp(new
ConfigEntry(QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG,
newValue.toString), op)
- val configOps = List(configOp, configOp2, configOp3).asJavaCollection
- admin.incrementalAlterConfigs(Map(
- resource -> configOps,
- ).asJava).all.get
- } finally {
- admin.close()
- }
+ val admin = createAdminClient()
+ try {
+ val resource = new ConfigResource(ConfigResource.Type.BROKER, brokerId)
+ val configOp = new AlterConfigOp(new
ConfigEntry(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG,
newValue.toString), op)
+ val configOp2 = new AlterConfigOp(new
ConfigEntry(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG,
newValue.toString), op)
+ val configOp3 = new AlterConfigOp(new
ConfigEntry(QuotaConfig.REPLICA_ALTER_LOG_DIRS_IO_MAX_BYTES_PER_SECOND_CONFIG,
newValue.toString), op)
+ val configOps = List(configOp, configOp2, configOp3).asJavaCollection
+ admin.incrementalAlterConfigs(Map(
+ resource -> configOps,
+ ).asJava).all.get
+ } finally {
+ admin.close()
}
}
diff --git
a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
index 68820f84762..a97cea67c5c 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala
@@ -84,13 +84,8 @@ class KafkaMetricsReporterTest extends QuorumTestHarness {
@ValueSource(strings = Array("kraft"))
def testMetricsContextNamespacePresent(quorum: String): Unit = {
assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID.get())
- if (isKRaftTest()) {
- assertNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get())
- assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
- } else {
-
assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get())
- assertNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
- }
+ assertNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get())
+ assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get())
broker.shutdown()
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 1a26139cb32..3a1043198fa 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -137,12 +137,10 @@ class ServerShutdownTest extends KafkaServerTestHarness {
@ParameterizedTest
@ValueSource(strings = Array("kraft"))
def testCleanShutdownAfterFailedStartup(quorum: String): Unit = {
- if (isKRaftTest()) {
-
propsToChangeUponRestart.setProperty(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG,
"1000")
- shutdownBroker()
- shutdownKRaftController()
- verifyCleanShutdownAfterFailedStartup[CancellationException]
- }
+
propsToChangeUponRestart.setProperty(KRaftConfigs.INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG,
"1000")
+ shutdownBroker()
+ shutdownKRaftController()
+ verifyCleanShutdownAfterFailedStartup[CancellationException]
}
@ParameterizedTest
@@ -195,7 +193,7 @@ class ServerShutdownTest extends KafkaServerTestHarness {
// goes wrong so that awaitShutdown doesn't hang
case e: Exception =>
assertCause(exceptionClassTag.runtimeClass, e)
- assertEquals(if (isKRaftTest()) BrokerState.SHUTTING_DOWN else
BrokerState.NOT_RUNNING, brokers.head.brokerState)
+ assertEquals(BrokerState.SHUTTING_DOWN, brokers.head.brokerState)
} finally {
shutdownBroker()
}
diff --git
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index dd3176b9cb8..b943dd5bff6 100644
---
a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++
b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -151,11 +151,7 @@ class LeaderEpochIntegrationTest extends QuorumTestHarness
with Logging {
def shouldIncreaseLeaderEpochBetweenLeaderRestarts(quorum: String): Unit = {
//Setup: we are only interested in the single partition on broker 101
brokers += createBroker(fromProps(createBrokerConfig(100,
zkConnectOrNull)))
- if (isKRaftTest()) {
- assertEquals(controllerServer.config.nodeId,
waitUntilQuorumLeaderElected(controllerServer))
- } else {
- assertEquals(100, TestUtils.waitUntilControllerElected(zkClient))
- }
+ assertEquals(controllerServer.config.nodeId,
waitUntilQuorumLeaderElected(controllerServer))
brokers += createBroker(fromProps(createBrokerConfig(101,
zkConnectOrNull)))