This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a57b7d8279e MINOR: Remove some obsolete JUnit tests (#17745)
a57b7d8279e is described below

commit a57b7d8279e8df8d3e1e1d1a71b56270a6a64f61
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Sun Nov 10 17:18:57 2024 -0800

    MINOR: Remove some obsolete JUnit tests (#17745)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../FetchRequestBetweenDifferentIbpTest.scala      | 158 ---------------------
 .../kafka/server/FetchRequestTestDowngrade.scala   |  83 -----------
 .../MetadataRequestBetweenDifferentIbpTest.scala   |  98 -------------
 .../FetchRequestWithLegacyMessageFormatTest.scala  |  71 ---------
 .../server/KafkaMetricReporterClusterIdTest.scala  | 119 ----------------
 5 files changed, 529 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala
 
b/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala
deleted file mode 100644
index 793735412f4..00000000000
--- 
a/core/src/test/scala/integration/kafka/server/FetchRequestBetweenDifferentIbpTest.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.time.Duration
-import java.util.Arrays.asList
-import kafka.utils.{TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.server.config.ReplicationConfigs
-import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.common.MetadataVersion.{IBP_2_7_IV0, 
IBP_2_8_IV1, IBP_3_1_IV0}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-
-import scala.collection.{Map, Seq}
-
-class FetchRequestBetweenDifferentIbpTest extends BaseRequestTest {
-
-  override def brokerCount: Int = 3
-  override def generateConfigs: Seq[KafkaConfig] = {
-    // Brokers should be at most 2 different IBP versions, but for more test 
coverage, three are used here.
-    Seq(
-      createConfig(0, IBP_2_7_IV0),
-      createConfig(1, IBP_2_8_IV1),
-      createConfig(2, IBP_3_1_IV0)
-    )
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
-  def testControllerOldIBP(quorum: String, groupProtocol: String): Unit = {
-    // Ensure controller version < IBP_2_8_IV1, and then create a topic where 
leader of partition 0 is not the controller,
-    // leader of partition 1 is.
-    testControllerWithGivenIBP(IBP_2_7_IV0, 0)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
-  def testControllerNewIBP(quorum: String, groupProtocol: String): Unit = {
-    // Ensure controller version = IBP_3_1_IV0, and then create a topic where 
leader of partition 1 is the old version.
-    testControllerWithGivenIBP(IBP_3_1_IV0, 2)
-  }
-
-  def testControllerWithGivenIBP(version: MetadataVersion, controllerBroker: 
Int): Unit = {
-    val topic = "topic"
-    val producer = createProducer()
-    val consumer = createConsumer()
-
-    ensureControllerWithIBP(version)
-    assertEquals(controllerBroker, controllerSocketServer.config.brokerId)
-    val partitionLeaders = createTopicWithAssignment(topic,  Map(0 -> Seq(1, 
0, 2), 1 -> Seq(0, 2, 1)))
-    TestUtils.waitForAllPartitionsMetadata(servers, topic, 2)
-
-    assertEquals(1, partitionLeaders(0))
-    assertEquals(0, partitionLeaders(1))
-
-    val record1 = new ProducerRecord(topic, 0, null, "key".getBytes, 
"value".getBytes)
-    val record2 = new ProducerRecord(topic, 1, null, "key".getBytes, 
"value".getBytes)
-    producer.send(record1)
-    producer.send(record2)
-
-    consumer.assign(asList(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1)))
-    val count = consumer.poll(Duration.ofMillis(5000)).count() + 
consumer.poll(Duration.ofMillis(5000)).count()
-    assertEquals(2, count)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
-  def testControllerNewToOldIBP(quorum: String, groupProtocol: String): Unit = 
{
-    testControllerSwitchingIBP(IBP_3_1_IV0, 2, IBP_2_7_IV0, 0)
-  }
-
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
-  def testControllerOldToNewIBP(quorum: String, groupProtocol: String): Unit = 
{
-    testControllerSwitchingIBP(IBP_2_7_IV0, 0, IBP_3_1_IV0, 2)
-  }
-
-
-  def testControllerSwitchingIBP(version1: MetadataVersion, broker1: Int, 
version2: MetadataVersion, broker2: Int): Unit = {
-    val topic = "topic"
-    val topic2 = "topic2"
-    val producer = createProducer()
-    val consumer = createConsumer()
-
-    // Ensure controller version = version1
-    ensureControllerWithIBP(version1)
-    assertEquals(broker1, controllerSocketServer.config.brokerId)
-    val partitionLeaders = createTopicWithAssignment(topic,  Map(0 -> Seq(1, 
0, 2), 1 -> Seq(0, 2, 1)))
-    TestUtils.waitForAllPartitionsMetadata(servers, topic, 2)
-    assertEquals(1, partitionLeaders(0))
-    assertEquals(0, partitionLeaders(1))
-
-    val record1 = new ProducerRecord(topic, 0, null, "key".getBytes, 
"value".getBytes)
-    val record2 = new ProducerRecord(topic, 1, null, "key".getBytes, 
"value".getBytes)
-    producer.send(record1)
-    producer.send(record2)
-
-    consumer.assign(asList(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1)))
-
-    val count = consumer.poll(Duration.ofMillis(5000)).count() + 
consumer.poll(Duration.ofMillis(5000)).count()
-    assertEquals(2, count)
-
-    // Make controller version2
-    ensureControllerWithIBP(version2)
-    assertEquals(broker2, controllerSocketServer.config.brokerId)
-    // Create a new topic
-    createTopicWithAssignment(topic2,  Map(0 -> Seq(1, 0, 2)))
-    TestUtils.waitForAllPartitionsMetadata(servers, topic2, 1)
-    TestUtils.waitForAllPartitionsMetadata(servers, topic, 2)
-
-    val record3 = new ProducerRecord(topic2, 0, null, "key".getBytes, 
"value".getBytes)
-    val record4 = new ProducerRecord(topic, 1, null, "key".getBytes, 
"value".getBytes)
-    producer.send(record3)
-    producer.send(record4)
-
-    // Assign this new topic in addition to the old topics.
-    consumer.assign(asList(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1), new TopicPartition(topic2, 0)))
-
-    val count2 = consumer.poll(Duration.ofMillis(5000)).count() + 
consumer.poll(Duration.ofMillis(5000)).count()
-    assertEquals(2, count2)
-  }
-
-  private def ensureControllerWithIBP(version: MetadataVersion): Unit = {
-    val nonControllerServers = 
servers.filter(_.config.interBrokerProtocolVersion != version)
-    nonControllerServers.iterator.foreach(server => {
-      server.shutdown()
-    })
-    TestUtils.waitUntilControllerElected(zkClient)
-    nonControllerServers.iterator.foreach(server => {
-      server.startup()
-    })
-  }
-
-  private def createConfig(nodeId: Int, interBrokerVersion: MetadataVersion): 
KafkaConfig = {
-    val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
-    props.put(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, 
interBrokerVersion.version)
-    KafkaConfig.fromProps(props)
-  }
-
-}
diff --git 
a/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala 
b/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
deleted file mode 100644
index 6c8bb4e9ccc..00000000000
--- 
a/core/src/test/scala/integration/kafka/server/FetchRequestTestDowngrade.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.time.Duration
-import java.util.Arrays.asList
-import kafka.utils.{TestInfoUtils, TestUtils}
-import kafka.zk.ZkVersion
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.server.config.ReplicationConfigs
-import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.common.MetadataVersion.{IBP_2_7_IV0, 
IBP_3_1_IV0}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-
-import scala.collection.{Map, Seq}
-
-class FetchRequestTestDowngrade extends BaseRequestTest {
-
-    override def brokerCount: Int = 2
-    override def generateConfigs: Seq[KafkaConfig] = {
-        // Controller should start with newer IBP and downgrade to the older 
one.
-        Seq(
-            createConfig(0, IBP_3_1_IV0),
-            createConfig(1, IBP_2_7_IV0)
-        )
-    }
-
-    @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-    
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
-    def testTopicIdIsRemovedFromFetcherWhenControllerDowngrades(quorum: 
String, groupProtocol: String): Unit = {
-        val tp = new TopicPartition("topic", 0)
-        val producer = createProducer()
-        val consumer = createConsumer()
-
-        ensureControllerIn(Seq(0))
-        assertEquals(0, controllerSocketServer.config.brokerId)
-        val partitionLeaders = createTopicWithAssignment(tp.topic, 
Map(tp.partition -> Seq(1, 0)))
-        TestUtils.waitForAllPartitionsMetadata(servers, tp.topic, 1)
-        ensureControllerIn(Seq(1))
-        assertEquals(1, controllerSocketServer.config.brokerId)
-
-        assertEquals(1, partitionLeaders(0))
-
-        val record1 = new ProducerRecord(tp.topic, tp.partition, null, 
"key".getBytes, "value".getBytes)
-        producer.send(record1)
-
-        consumer.assign(asList(tp))
-        val count = consumer.poll(Duration.ofMillis(5000)).count()
-        assertEquals(1, count)
-    }
-
-    private def ensureControllerIn(brokerIds: Seq[Int]): Unit = {
-        while (!brokerIds.contains(controllerSocketServer.config.brokerId)) {
-            zkClient.deleteController(ZkVersion.MatchAnyVersion)
-            TestUtils.waitUntilControllerElected(zkClient)
-        }
-    }
-
-    private def createConfig(nodeId: Int, interBrokerVersion: 
MetadataVersion): KafkaConfig = {
-        val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
-        props.put(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, 
interBrokerVersion.version)
-        KafkaConfig.fromProps(props)
-    }
-
-}
diff --git 
a/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala
 
b/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala
deleted file mode 100644
index afe9d36178f..00000000000
--- 
a/core/src/test/scala/integration/kafka/server/MetadataRequestBetweenDifferentIbpTest.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.server
-
-import kafka.network.SocketServer
-import kafka.utils.TestUtils
-import kafka.zk.ZkVersion
-import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.message.MetadataRequestData
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
-import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV0
-import org.apache.kafka.server.config.ReplicationConfigs
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
-
-import scala.collection.{Map, Seq}
-
-class MetadataRequestBetweenDifferentIbpTest extends BaseRequestTest {
-
-  override def brokerCount: Int = 3
-  override def generateConfigs: Seq[KafkaConfig] = {
-    Seq(
-      createConfig(0, IBP_2_8_IV0),
-      createConfig(1, MetadataVersion.latestTesting),
-      createConfig(2, MetadataVersion.latestTesting)
-    )
-  }
-
-  @Test
-  def testUnknownTopicId(): Unit = {
-    val topic = "topic"
-
-    // Kill controller and restart until broker with latest ibp become 
controller
-    ensureControllerIn(Seq(1, 2))
-    createTopicWithAssignment(topic, Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1)))
-
-    val resp1 = sendMetadataRequest(new MetadataRequest(requestData(topic, 
Uuid.ZERO_UUID), 12.toShort), controllerSocketServer)
-    val topicId = resp1.topicMetadata.iterator().next().topicId()
-
-    // We could still get topic metadata by topicId
-   val topicMetadata = sendMetadataRequest(new 
MetadataRequest(requestData(null, topicId), 12.toShort), controllerSocketServer)
-      .topicMetadata.iterator().next()
-    assertEquals(topicId, topicMetadata.topicId())
-    assertEquals(topic, topicMetadata.topic())
-
-    // Make the broker whose version=IBP_2_8_IV0 controller
-    ensureControllerIn(Seq(0))
-
-    // Restart the broker whose ibp is higher, and the controller will send 
metadata request to it
-    killBroker(1)
-    restartDeadBrokers()
-
-    // Send request to a broker whose ibp is higher and restarted just now
-    val resp2 = sendMetadataRequest(new MetadataRequest(requestData(topic, 
topicId), 12.toShort), brokerSocketServer(1))
-    assertEquals(Errors.UNKNOWN_TOPIC_ID, 
resp2.topicMetadata.iterator().next().error())
-  }
-
-  private def ensureControllerIn(brokerIds: Seq[Int]): Unit = {
-    while (!brokerIds.contains(controllerSocketServer.config.brokerId)) {
-      zkClient.deleteController(ZkVersion.MatchAnyVersion)
-      TestUtils.waitUntilControllerElected(zkClient)
-    }
-  }
-
-  private def createConfig(nodeId: Int, interBrokerVersion: MetadataVersion): 
KafkaConfig = {
-    val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
-    props.put(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG, 
interBrokerVersion.version)
-    KafkaConfig.fromProps(props)
-  }
-
-  def requestData(topic: String, topicId: Uuid): MetadataRequestData = {
-    val data = new MetadataRequestData
-    data.topics.add(new 
MetadataRequestData.MetadataRequestTopic().setName(topic).setTopicId(topicId))
-    data
-  }
-
-  private def sendMetadataRequest(request: MetadataRequest, destination: 
SocketServer): MetadataResponse = {
-    connectAndReceive[MetadataResponse](request, destination)
-  }
-
-}
diff --git 
a/core/src/test/scala/unit/kafka/server/FetchRequestWithLegacyMessageFormatTest.scala
 
b/core/src/test/scala/unit/kafka/server/FetchRequestWithLegacyMessageFormatTest.scala
deleted file mode 100644
index b9073846149..00000000000
--- 
a/core/src/test/scala/unit/kafka/server/FetchRequestWithLegacyMessageFormatTest.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
-import 
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.Test
-
-import java.util.Properties
-import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_2_IV0
-
-import scala.annotation.nowarn
-import scala.collection.Seq
-import scala.jdk.CollectionConverters._
-
-class FetchRequestWithLegacyMessageFormatTest extends BaseFetchRequestTest {
-
-  override def brokerPropertyOverrides(properties: Properties): Unit = {
-    super.brokerPropertyOverrides(properties)
-    // legacy message formats are only supported with IBP < 3.0
-    properties.put(INTER_BROKER_PROTOCOL_VERSION_CONFIG, "2.8")
-  }
-
-  /**
-   * Fetch request v2 (pre KIP-74) respected `maxPartitionBytes` even if no 
message could be returned
-   * due to a message that was larger than `maxPartitionBytes`.
-   */
-  @nowarn("cat=deprecation")
-  @Test
-  def testFetchRequestV2WithOversizedMessage(): Unit = {
-    initProducer()
-    val maxPartitionBytes = 200
-    // Fetch v2 down-converts if the message format is >= 0.11 and we want to 
avoid
-    // that as it affects the size of the returned buffer
-    val topicConfig = Map(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG -> 
IBP_0_10_2_IV0.version)
-    val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions 
= 1, topicConfig).head
-    val topicIds = getTopicIds().asJava
-    val topicNames = topicIds.asScala.map(_.swap).asJava
-    producer.send(new ProducerRecord(topicPartition.topic, 
topicPartition.partition,
-      "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get
-    val fetchVersion: Short = 2
-    val fetchRequest = FetchRequest.Builder.forConsumer(fetchVersion, 
Int.MaxValue, 0,
-      createPartitionMap(maxPartitionBytes, 
Seq(topicPartition))).build(fetchVersion)
-    val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
-    val partitionData = fetchResponse.responseData(topicNames, 
fetchVersion).get(topicPartition)
-    assertEquals(Errors.NONE.code, partitionData.errorCode)
-
-    assertTrue(partitionData.highWatermark > 0)
-    assertEquals(maxPartitionBytes, FetchResponse.recordsSize(partitionData))
-    assertEquals(0, records(partitionData).map(_.sizeInBytes).sum)
-  }
-
-}
diff --git 
a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
deleted file mode 100755
index 9cc02042107..00000000000
--- 
a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterClusterIdTest.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package kafka.server
-
-import java.util.concurrent.atomic.AtomicReference
-import kafka.metrics.KafkaMetricsReporter
-import kafka.utils.{CoreUtils, TestUtils, VerifiableProperties}
-import kafka.server.QuorumTestHarness
-import org.apache.kafka.common.{ClusterResource, ClusterResourceListener}
-import org.apache.kafka.server.config.ServerConfigs
-import org.apache.kafka.server.metrics.MetricConfigs
-import org.apache.kafka.test.MockMetricsReporter
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
-import org.apache.kafka.test.TestUtils.isValidClusterId
-
-object KafkaMetricReporterClusterIdTest {
-  val setupError = new AtomicReference[String]("")
-
-  class MockKafkaMetricsReporter extends KafkaMetricsReporter with 
ClusterResourceListener {
-
-    override def onUpdate(clusterMetadata: ClusterResource): Unit = {
-      MockKafkaMetricsReporter.CLUSTER_META.set(clusterMetadata)
-    }
-
-    override def init(props: VerifiableProperties): Unit = {
-    }
-  }
-
-  object MockKafkaMetricsReporter {
-    val CLUSTER_META = new AtomicReference[ClusterResource]
-  }
-
-  object MockBrokerMetricsReporter {
-    val CLUSTER_META: AtomicReference[ClusterResource] = new 
AtomicReference[ClusterResource]
-  }
-
-  class MockBrokerMetricsReporter extends MockMetricsReporter with 
ClusterResourceListener {
-
-    override def onUpdate(clusterMetadata: ClusterResource): Unit = {
-      MockBrokerMetricsReporter.CLUSTER_META.set(clusterMetadata)
-    }
-
-    override def configure(configs: java.util.Map[String, _]): Unit = {
-      // Check that the configuration passed to the MetricsReporter includes 
the broker id as an Integer.
-      // This is a regression test for KAFKA-4756.
-      //
-      // Because this code is run during the test setUp phase, if we throw an 
exception here,
-      // it just results in the test itself being declared "not found" rather 
than failing.
-      // So we track an error message which we will check later in the test 
body.
-      val brokerId = configs.get(ServerConfigs.BROKER_ID_CONFIG)
-      if (brokerId == null)
-        setupError.compareAndSet("", "No value was set for the broker id.")
-      else if (!brokerId.isInstanceOf[String])
-        setupError.compareAndSet("", "The value set for the broker id was not 
a string.")
-      try
-        Integer.parseInt(brokerId.asInstanceOf[String])
-      catch {
-        case e: Exception => setupError.compareAndSet("", "Error parsing 
broker id " + e.toString)
-      }
-    }
-  }
-}
-
-class KafkaMetricReporterClusterIdTest extends QuorumTestHarness {
-  var server: KafkaServer = _
-  var config: KafkaConfig = _
-
-  @BeforeEach
-  override def setUp(testInfo: TestInfo): Unit = {
-    super.setUp(testInfo)
-    val props = TestUtils.createBrokerConfig(1, zkConnect)
-    props.setProperty(MetricConfigs.KAFKA_METRICS_REPORTER_CLASSES_CONFIG, 
"kafka.server.KafkaMetricReporterClusterIdTest$MockKafkaMetricsReporter")
-    props.setProperty(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, 
"kafka.server.KafkaMetricReporterClusterIdTest$MockBrokerMetricsReporter")
-    props.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "true")
-    props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "-1")
-    config = KafkaConfig.fromProps(props)
-    server = new KafkaServer(config, threadNamePrefix = 
Option(this.getClass.getName))
-    server.startup()
-  }
-
-  @Test
-  def testClusterIdPresent(): Unit = {
-    assertEquals("", KafkaMetricReporterClusterIdTest.setupError.get())
-
-    
assertNotNull(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META)
-    
isValidClusterId(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META.get().clusterId())
-
-    
assertNotNull(KafkaMetricReporterClusterIdTest.MockBrokerMetricsReporter.CLUSTER_META)
-    
isValidClusterId(KafkaMetricReporterClusterIdTest.MockBrokerMetricsReporter.CLUSTER_META.get().clusterId())
-
-    
assertEquals(KafkaMetricReporterClusterIdTest.MockKafkaMetricsReporter.CLUSTER_META.get().clusterId(),
-      
KafkaMetricReporterClusterIdTest.MockBrokerMetricsReporter.CLUSTER_META.get().clusterId())
-
-    server.shutdown()
-    TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
-  }
-
-  @AfterEach
-  override def tearDown(): Unit = {
-    server.shutdown()
-    CoreUtils.delete(config.logDirs)
-    super.tearDown()
-  }
-}

Reply via email to