This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a57b7d8279e MINOR: Remove some obsolete JUnit tests (#17745)
a57b7d8279e is described below
commit a57b7d8279e8df8d3e1e1d1a71b56270a6a64f61
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Sun Nov 10 17:18:57 2024 -0800
MINOR: Remove some obsolete JUnit tests (#17745)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../FetchRequestBetweenDifferentIbpTest.scala | 158 ---------------------
.../kafka/server/FetchRequestTestDowngrade.scala | 83 -----------
.../MetadataRequestBetweenDifferentIbpTest.scala | 98 -------------
.../FetchRequestWithLegacyMessageFormatTest.scala | 71 ---------
.../server/KafkaMetricReporterClusterIdTest.scala | 119 ----------------
5 files changed, 529 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala
b/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala
deleted file mode 100644
index 793735412f4..00000000000
---
a/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.time.Duration
-import java.util.Arrays.asList
-import kafka.utils.{TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.server.config.ReplicationConfigs
-import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.common.MetadataVersion.{IBP_2_7_IV0,
IBP_2_8_IV1, IBP_3_1_IV0}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-
-import scala.collection.{Map, Seq}
-
-class FetchRequestBetweenDifferentIbpTest extends BaseRequestTest {
-
- override def brokerCount: Int = 3
- override def generateConfigs: Seq[KafkaConfig] = {
- // Brokers should be at most 2 different IBP versions, but for more test
coverage, three are used here.
- Seq(
- createConfig(0, IBP_2_7_IV0),
- createConfig(1, IBP_2_8_IV1),
- createConfig(2, IBP_3_1_IV0)
- )
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
- def testControllerOldIBP(quorum: String, groupProtocol: String): Unit = {
- // Ensure controller version < IBP_2_8_IV1, and then create a topic where
leader of partition 0 is not the controller,
- // leader of partition 1 is.
- testControllerWithGivenIBP(IBP_2_7_IV0, 0)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
- def testControllerNewIBP(quorum: String, groupProtocol: String): Unit = {
- // Ensure controller version = IBP_3_1_IV0, and then create a topic where
leader of partition 1 is the old version.
- testControllerWithGivenIBP(IBP_3_1_IV0, 2)
- }
-
- def testControllerWithGivenIBP(version: MetadataVersion, controllerBroker:
Int): Unit = {
- val topic = "topic"
- val producer = createProducer()
- val consumer = createConsumer()
-
- ensureControllerWithIBP(version)
- assertEquals(controllerBroker, controllerSocketServer.config.brokerId)
- val partitionLeaders = createTopicWithAssignment(topic, Map(0 -> Seq(1,
0, 2), 1 -> Seq(0, 2, 1)))
- TestUtils.waitForAllPartitionsMetadata(servers, topic, 2)
-
- assertEquals(1, partitionLeaders(0))
- assertEquals(0, partitionLeaders(1))
-
- val record1 = new ProducerRecord(topic, 0, null, "key".getBytes,
"value".getBytes)
- val record2 = new ProducerRecord(topic, 1, null, "key".getBytes,
"value".getBytes)
- producer.send(record1)
- producer.send(record2)
-
- consumer.assign(asList(new TopicPartition(topic, 0), new
TopicPartition(topic, 1)))
- val count = consumer.poll(Duration.ofMillis(5000)).count() +
consumer.poll(Duration.ofMillis(5000)).count()
- assertEquals(2, count)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
- def testControllerNewToOldIBP(quorum: String, groupProtocol: String): Unit =
{
- testControllerSwitchingIBP(IBP_3_1_IV0, 2, IBP_2_7_IV0, 0)
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
- def testControllerOldToNewIBP(quorum: String, groupProtocol: String): Unit =
{
- testControllerSwitchingIBP(IBP_2_7_IV0, 0, IBP_3_1_IV0, 2)
- }
-
-
- def testControllerSwitchingIBP(version1: MetadataVersion, broker1: Int,
version2: MetadataVersion, broker2: Int): Unit = {
- val topic = "topic"
- val topic2 = "topic2"
- val producer = createProducer()
- val consumer = createConsumer()
-
- // Ensure controller version = version1
- ensureControllerWithIBP(version1)
- assertEquals(broker1, controllerSocketServer.config.brokerId)
- val partitionLeaders = createTopicWithAssignment(topic, Map(0 -> Seq(1,
0, 2), 1 -> Seq(0, 2, 1)))
- TestUtils.waitForAllPartitionsMetadata(servers, topic, 2)
- assertEquals(1, partitionLeaders(0))
- assertEquals(0, partitionLeaders(1))
-
- val record1 = new ProducerRecord(topic, 0, null, "key".getBytes,
"value".getBytes)
- val record2 = new ProducerRecord(topic, 1, null, "key".getBytes,
"value".getBytes)
- producer.send(record1)
- producer.send(record2)
-
- consumer.assign(asList(new TopicPartition(topic, 0), new
TopicPartition(topic, 1)))
-
- val count = consumer.poll(Duration.ofMillis(5000)).count() +
consumer.poll(Duration.ofMillis(5000)).count()
- assertEquals(2, count)
-
- // Make controller version2
- ensureControllerWithIBP(version2)
- assertEquals(broker2, controllerSocketServer.config.brokerId)
- // Create a new topic
- createTopicWithAssignment(topic2, Map(0 -> Seq(1, 0, 2)))
- TestUtils.waitForAllPartitionsMetadata(servers, topic2, 1)
- TestUtils.waitForAllPartitionsMetadata(servers, topic, 2)
-
- val record3 = new ProducerRecord(topic2, 0, null, "key".getBytes,
"value".getBytes)
- val record4 = new ProducerRecord(topic, 1, null, "key".getBytes,
"value".getBytes)
- producer.send(record3)
- producer.send(record4)
-
- // Assign this new topic in addition to the old topics.
- consumer.assign(asList(new TopicPartition(topic, 0), new
TopicPartition(topic, 1), new TopicPartition(topic2, 0)))
-
- val count2 = consumer.poll(Duration.ofMillis(5000)).count() +
consumer.poll(Duration.ofMillis(5000)).count()
- assertEquals(2, count2)
- }
-
- private def ensureControllerWithIBP(version: MetadataVersion): Unit = {
- val nonControllerServers =
servers.filter(_.config.interBrokerProtocolVersion != version)
- nonControllerServers.iterator.foreach(server => {
- server.shutdown()
- })
- TestUtils.waitUntilControllerElected(zkClient)
- nonControllerServers.iterator.foreach(server => {
- server.startup()
- })
- }
-
- private def createConfig(nodeId: Int, interBrokerVersion: MetadataVersion):
KafkaConfig = {
- val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
- props.put(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG,
interBrokerVersion.version)
- KafkaConfig.fromProps(props)
- }
-
-}
diff --git
a/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
b/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
deleted file mode 100644
index 6c8bb4e9ccc..00000000000
---
a/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.time.Duration
-import java.util.Arrays.asList
-import kafka.utils.{TestInfoUtils, TestUtils}
-import kafka.zk.ZkVersion
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.server.config.ReplicationConfigs
-import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.common.MetadataVersion.{IBP_2_7_IV0,
IBP_3_1_IV0}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-
-import scala.collection.{Map, Seq}
-
-class FetchRequestTestDowngrade extends BaseRequestTest {
-
- override def brokerCount: Int = 2
- override def generateConfigs: Seq[KafkaConfig] = {
- // Controller should start with newer IBP and downgrade to the older
one.
- Seq(
- createConfig(0, IBP_3_1_IV0),
- createConfig(1, IBP_2_7_IV0)
- )
- }
-
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
- def testTopicIdIsRemovedFromFetcherWhenControllerDowngrades(quorum:
String, groupProtocol: String): Unit = {
- val tp = new TopicPartition("topic", 0)
- val producer = createProducer()
- val consumer = createConsumer()
-
- ensureControllerIn(Seq(0))
- assertEquals(0, controllerSocketServer.config.brokerId)
- val partitionLeaders = createTopicWithAssignment(tp.topic,
Map(tp.partition -> Seq(1, 0)))
- TestUtils.waitForAllPartitionsMetadata(servers, tp.topic, 1)
- ensureControllerIn(Seq(1))
- assertEquals(1, controllerSocketServer.config.brokerId)
-
- assertEquals(1, partitionLeaders(0))
-
- val record1 = new ProducerRecord(tp.topic, tp.partition, null,
"key".getBytes, "value".getBytes)
- producer.send(record1)
-
- consumer.assign(asList(tp))
- val count = consumer.poll(Duration.ofMillis(5000)).count()
- assertEquals(1, count)
- }
-
- private def ensureControllerIn(brokerIds: Seq[Int]): Unit = {
- while (!brokerIds.contains(controllerSocketServer.config.brokerId)) {
- zkClient.deleteController(ZkVersion.MatchAnyVersion)
- TestUtils.waitUntilControllerElected(zkClient)
- }
- }
-
- private def createConfig(nodeId: Int, interBrokerVersion:
MetadataVersion): KafkaConfig = {
- val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
- props.put(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG,
interBrokerVersion.version)
- KafkaConfig.fromProps(props)
- }
-
-}
diff --git
a/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala
b/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala
deleted file mode 100644
index afe9d36178f..00000000000
---
a/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import kafka.network.SocketServer
-import kafka.utils.TestUtils
-import kafka.zk.ZkVersion
-import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.message.MetadataRequestData
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
-import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV0
-import org.apache.kafka.server.config.ReplicationConfigs
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-import scala.collection.{Map, Seq}
-
-class MetadataRequestBetweenDifferentIbpTest extends BaseRequestTest {
-
- override def brokerCount: Int = 3
- override def generateConfigs: Seq[KafkaConfig] = {
- Seq(
- createConfig(0, IBP_2_8_IV0),
- createConfig(1, MetadataVersion.latestTesting),
- createConfig(2, MetadataVersion.latestTesting)
- )
- }
-
- @Test
- def testUnknownTopicId(): Unit = {
- val topic = "topic"
-
- // Kill controller and restart until broker with latest ibp become
controller
- ensureControllerIn(Seq(1, 2))
- createTopicWithAssignment(topic, Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1)))
-
- val resp1 = sendMetadataRequest(new MetadataRequest(requestData(topic,
Uuid.ZERO_UUID), 12.toShort), controllerSocketServer)
- val topicId = resp1.topicMetadata.iterator().next().topicId()
-
- // We could still get topic metadata by topicId
- val topicMetadata = sendMetadataRequest(new
MetadataRequest(requestData(null, topicId), 12.toShort), controllerSocketServer)
- .topicMetadata.iterator().next()
- assertEquals(topicId, topicMetadata.topicId())
- assertEquals(topic, topicMetadata.topic())
-
- // Make the broker whose version=IBP_2_8_IV0 controller
- ensureControllerIn(Seq(0))
-
- // Restart the broker whose ibp is higher, and the controller will send
metadata request to it
- killBroker(1)
- restartDeadBrokers()
-
- // Send request to a broker whose ibp is higher and restarted just now
- val resp2 = sendMetadataRequest(new MetadataRequest(requestData(topic,
topicId), 12.toShort), brokerSocketServer(1))
- assertEquals(Errors.UNKNOWN_TOPIC_ID,
resp2.topicMetadata.iterator().next().error())
- }
-
- private def ensureControllerIn(brokerIds: Seq[Int]): Unit = {
- while (!brokerIds.contains(controllerSocketServer.config.brokerId)) {
- zkClient.deleteController(ZkVersion.MatchAnyVersion)
- TestUtils.waitUntilControllerElected(zkClient)
- }
- }
-
- private def createConfig(nodeId: Int, interBrokerVersion: MetadataVersion):
KafkaConfig = {
- val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
- props.put(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG,
interBrokerVersion.version)
- KafkaConfig.fromProps(props)
- }
-
- def requestData(topic: String, topicId: Uuid): MetadataRequestData = {
- val data = new MetadataRequestData
- data.topics.add(new
MetadataRequestData.MetadataRequestTopic().setName(topic).setTopicId(topicId))
- data
- }
-
- private def sendMetadataRequest(request: MetadataRequest, destination:
SocketServer): MetadataResponse = {
- connectAndReceive[MetadataResponse](request, destination)
- }
-
-}
diff --git
a/core/src/test/scala/unit/kafka/server/FetchRequestWithLegacyMessageFormatTest.scala
b/core/src/test/scala/unit/kafka/server/FetchRequestWithLegacyMessageFormatTest.scala
deleted file mode 100644
index b9073846149..00000000000
---
a/core/src/test/scala/unit/kafka/server/FetchRequestWithLegacyMessageFormatTest.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
-import
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.Test
-
-import java.util.Properties
-import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_2_IV0
-
-import scala.annotation.nowarn
-import scala.collection.Seq
-import scala.jdk.CollectionConverters._
-
-class FetchRequestWithLegacyMessageFormatTest extends BaseFetchRequestTest {
-
- override def brokerPropertyOverrides(properties: Properties): Unit = {
- super.brokerPropertyOverrides(properties)
- // legacy message formats are only supported with IBP < 3.0
- properties.put(INTER_BROKER_PROTOCOL_VERSION_CONFIG, "2.8")
- }
-
- /**
- * Fetch request v2 (pre KIP-74) respected `maxPartitionBytes` even if no
message could be returned
- * due to a message that was larger than `maxPartitionBytes`.
- */
- @nowarn("cat=deprecation")
- @Test
- def testFetchRequestV2WithOversizedMessage(): Unit = {
- initProducer()
- val maxPartitionBytes = 200
- // Fetch v2 down-converts if the message format is >= 0.11 and we want to
avoid
- // that as it affects the size of the returned buffer
- val topicConfig = Map(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG ->
IBP_0_10_2_IV0.version)
- val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions
= 1, topicConfig).head
- val topicIds = getTopicIds().asJava
- val topicNames = topicIds.asScala.map(_.swap).asJava
- producer.send(new ProducerRecord(topicPartition.topic,
topicPartition.partition,
- "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get
- val fetchVersion: Short = 2
- val fetchRequest = FetchRequest.Builder.forConsumer(fetchVersion,
Int.MaxValue, 0,
- createPartitionMap(maxPartitionBytes,
Seq(topicPartition))).build(fetchVersion)
- val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
- val partitionData = fetchResponse.responseData(topicNames,
fetchVersion).get(topicPartition)
- assertEquals(Errors.NONE.code, partitionData.errorCode)
-
- assertTrue(partitionData.highWatermark > 0)
- assertEquals(maxPartitionBytes, FetchResponse.recordsSize(partitionData))
- assertEquals(0, records(partitionData).map(_.sizeInBytes).sum)
- }
-
-}
diff --git
a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
deleted file mode 100755
index 9cc02042107..00000000000
---
a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.util.concurrent.atomic.AtomicReference
-import kafka.metrics.KafkaMetricsReporter
-import kafka.utils.{CoreUtils, TestUtils, VerifiableProperties}
-import kafka.server.QuorumTestHarness
-import org.apache.kafka.common.{ClusterResource, ClusterResourceListener}
-import org.apache.kafka.server.config.ServerConfigs
-import org.apache.kafka.server.metrics.MetricConfigs
-import org.apache.kafka.test.MockMetricsReporter
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
-import org.apache.kafka.test.TestUtils.isValidClusterId
-
-object KafkaMetricReporterClusterIdTest {
- val setupError = new AtomicReference[String]("")
-
- class MockKafkaMetricsReporter extends KafkaMetricsReporter with
ClusterResourceListener {
-
- override def onUpdate(clusterMetadata: ClusterResource): Unit = {
- MockKafkaMetricsReporter.CLUSTER_META.set(clusterMetadata)
- }
-
- override def init(props: VerifiableProperties): Unit = {
- }
- }
-
- object MockKafkaMetricsReporter {
- val CLUSTER_META = new AtomicReference[ClusterResource]
- }
-
- object MockBrokerMetricsReporter {
- val CLUSTER_META: AtomicReference[ClusterResource] = new
AtomicReference[ClusterResource]
- }
-
- class MockBrokerMetricsReporter extends MockMetricsReporter with
ClusterResourceListener {
-
- override def onUpdate(clusterMetadata: ClusterResource): Unit = {
- MockBrokerMetricsReporter.CLUSTER_META.set(clusterMetadata)
- }
-
- override def configure(configs: java.util.Map[String, _]): Unit = {
- // Check that the configuration passed to the MetricsReporter includes
the broker id as an Integer.
- // This is a regression test for KAFKA-4756.
- //
- // Because this code is run during the test setUp phase, if we throw an
exception here,
- // it just results in the test itself being declared "not found" rather
than failing.
- // So we track an error message which we will check later in the test
body.
- val brokerId = configs.get(ServerConfigs.BROKER_ID_CONFIG)
- if (brokerId == null)
- setupError.compareAndSet("", "No value was set for the broker id.")
- else if (!brokerId.isInstanceOf[String])
- setupError.compareAndSet("", "The value set for the broker id was not
a string.")
- try
- Integer.parseInt(brokerId.asInstanceOf[String])
- catch {
- case e: Exception => setupError.compareAndSet("", "Error parsing
broker id " + e.toString)
- }
- }
- }
-}
-
-class KafkaMetricReporterClusterIdTest extends QuorumTestHarness {
- var server: KafkaServer = _
- var config: KafkaConfig = _
-
- @BeforeEach
- override def setUp(testInfo: TestInfo): Unit = {
- super.setUp(testInfo)
- val props = TestUtils.createBrokerConfig(1, zkConnect)
- props.setProperty(MetricConfigs.KAFKA_METRICS_REPORTER_CLASSES_CONFIG,
"kafka.server.KafkaMetricReporterClusterIdTest$MockKafkaMetricsReporter")
- props.setProperty(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG,
"kafka.server.KafkaMetricReporterClusterIdTest$MockBrokerMetricsReporter")
- props.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "true")
- props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "-1")
- config = KafkaConfig.fromProps(props)
- server = new KafkaServer(config, threadNamePrefix =
Option(this.getClass.getName))
- server.startup()
- }
-
- @Test
- def testClusterIdPresent(): Unit = {
- assertEquals("", KafkaMetricReporterClusterIdTest.setupError.get())
-
-
assertNotNull(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META)
-
isValidClusterId(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META.get().clusterId())
-
-
assertNotNull(KafkaMetricReporterClusterIdTest.MockBrokerMetricsReporter.CLUSTER_META)
-
isValidClusterId(KafkaMetricReporterClusterIdTest.MockBrokerMetricsReporter.CLUSTER_META.get().clusterId())
-
-
assertEquals(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META.get().clusterId(),
-
KafkaMetricReporterClusterIdTest.MockBrokerMetricsReporter.CLUSTER_META.get().clusterId())
-
- server.shutdown()
- TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
- }
-
- @AfterEach
- override def tearDown(): Unit = {
- server.shutdown()
- CoreUtils.delete(config.logDirs)
- super.tearDown()
- }
-}