This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new beb790e256d MINOR: Convert kafka.api.MetricsTest to KRaft (#17744)
beb790e256d is described below

commit beb790e256db15174757c57f4288560be9c5c077
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Sun Nov 10 19:54:26 2024 -0800

    MINOR: Convert kafka.api.MetricsTest to KRaft (#17744)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../scala/integration/kafka/api/MetricsTest.scala  | 74 +++++++---------------
 1 file changed, 22 insertions(+), 52 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala 
b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index baf0e4bd453..71d2764aee8 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -14,17 +14,17 @@ package kafka.api
 
 import com.yammer.metrics.core.{Gauge, Histogram, Meter}
 import kafka.security.JaasTestUtils
-import kafka.server.KafkaServer
+import kafka.server.KafkaBroker
 import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
-import org.apache.kafka.common.config.{SaslConfigs, TopicConfig}
+import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.errors.{InvalidTopicException, 
UnknownTopicOrPartitionException}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.authenticator.TestJaasConfig
 import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
-import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs, 
ZkConfigs}
+import org.apache.kafka.server.config.ServerLogConfigs
 import 
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, 
NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.junit.jupiter.api.Assertions._
@@ -32,8 +32,8 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.CsvSource
 
+
 import java.util.{Locale, Properties}
-import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 
 class MetricsTest extends IntegrationTestHarness with SaslSetup {
@@ -45,9 +45,7 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
   private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
   private val kafkaServerJaasEntryName =
     
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME}"
-  this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, 
"false")
   
this.serverConfig.setProperty(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG,
 "false")
-  
this.serverConfig.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG,
 "2.8")
   this.producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10")
   // intentionally slow message down conversion via gzip compression to ensure 
we can measure the time it takes
   this.producerConfig.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
"gzip")
@@ -65,6 +63,7 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
       
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
 classOf[NoOpRemoteStorageManager].getName)
       
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
 classOf[NoOpRemoteLogMetadataManager].getName)
     }
+    this.consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic")
     verifyNoRequestMetrics("Request metrics not removed in a previous test")
     startSasl(jaasSections(kafkaServerSaslMechanisms, 
Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
     super.setUp(testInfo)
@@ -80,22 +79,21 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
   /**
    * Verifies some of the metrics of producer, consumer as well as server.
    */
-  @nowarn("cat=deprecation")
-  @ParameterizedTest(name = 
"{displayName}.quorum={0}.groupProtocol={1}.systemRemoteStorageEnabled={2}")
-  @CsvSource(Array("zk,classic,true", "zk,classic,false"))
-  def testMetrics(quorum: String, groupProtocol: String, 
systemRemoteStorageEnabled: Boolean): Unit = {
-    val topic = "topicWithOldMessageFormat"
-    val props = new Properties
-    props.setProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0")
-    createTopic(topic, numPartitions = 1, replicationFactor = 1, props)
+  @ParameterizedTest(name = "testMetrics with systemRemoteStorageEnabled: {1}")
+  @CsvSource(Array("kraft, true", "kraft, false"))
+  def testMetrics(quorum: String, systemRemoteStorageEnabled: Boolean): Unit = 
{
+    val topic = "mytopic"
+    createTopic(topic,
+      numPartitions = 1,
+      replicationFactor = 1,
+      listenerName = interBrokerListenerName,
+      adminClientConfig = adminClientConfig)
     val tp = new TopicPartition(topic, 0)
 
     // Produce and consume some records
     val numRecords = 10
     val recordSize = 100000
     val prop = new Properties()
-    // idempotence producer doesn't support old version of messages
-    prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
     val producer = createProducer(configOverrides = prop)
     sendRecords(producer, numRecords, recordSize, tp)
 
@@ -108,10 +106,9 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
     verifyClientVersionMetrics(consumer.metrics, "Consumer")
     verifyClientVersionMetrics(producer.metrics, "Producer")
 
-    val server = servers.head
-    verifyBrokerMessageConversionMetrics(server, recordSize, tp)
-    verifyBrokerErrorMetrics(servers.head)
-    verifyBrokerZkMetrics(server, topic)
+    val server = brokers.head
+    verifyBrokerMessageMetrics(server, recordSize, tp)
+    verifyBrokerErrorMetrics(server)
 
     generateAuthenticationFailure(tp)
     verifyBrokerAuthenticationMetrics(server)
@@ -195,7 +192,7 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
     }
   }
 
-  private def verifyBrokerAuthenticationMetrics(server: KafkaServer): Unit = {
+  private def verifyBrokerAuthenticationMetrics(server: KafkaBroker): Unit = {
     val metrics = server.metrics.metrics
     TestUtils.waitUntilTrue(() =>
       maxKafkaMetricValue("failed-authentication-total", metrics, "Broker", 
Some("socket-server-metrics")) > 0,
@@ -206,15 +203,12 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
     verifyKafkaMetricRecorded("failed-authentication-total", metrics, 
"Broker", Some("socket-server-metrics"))
   }
 
-  private def verifyBrokerMessageConversionMetrics(server: KafkaServer, 
recordSize: Int, tp: TopicPartition): Unit = {
+  private def verifyBrokerMessageMetrics(server: KafkaBroker, recordSize: Int, 
tp: TopicPartition): Unit = {
     val requestMetricsPrefix = "kafka.network:type=RequestMetrics"
     val requestBytes = 
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Produce")
     val tempBytes = 
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Produce")
     assertTrue(tempBytes >= recordSize, s"Unexpected temporary memory size 
requestBytes $requestBytes tempBytes $tempBytes")
 
-    
verifyYammerMetricRecorded(s"kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec")
-    // if message conversion run too fast, the Math.round(value) may be 0.0, 
so using count to check whether the metric is updated
-    
assertTrue(yammerHistogram(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce").count()
 > 0, "MessageConversionsTimeMs count should be > 0")
     
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Fetch")
     
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch",
 value => value == 0.0)
 
@@ -222,21 +216,7 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
     
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Metadata")
   }
 
-  private def verifyBrokerZkMetrics(server: KafkaServer, topic: String): Unit 
= {
-    val histogram = 
yammerHistogram("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs")
-    // Latency is rounded to milliseconds, so check the count instead
-    val initialCount = histogram.count
-    servers.head.zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
-    val newCount = histogram.count
-    assertTrue(newCount > initialCount, "ZooKeeper latency not recorded")
-
-    val min = histogram.min
-    assertTrue(min >= 0, s"Min latency should not be negative: $min")
-
-    assertEquals("CONNECTED", yammerMetricValue("SessionState"), s"Unexpected 
ZK state")
-  }
-
-  private def verifyBrokerErrorMetrics(server: KafkaServer): Unit = {
+  private def verifyBrokerErrorMetrics(server: KafkaBroker): Unit = {
 
     def errorMetricCount = 
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala.count(_.getName == 
"ErrorsPerSec")
 
@@ -255,7 +235,7 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
     // Check that error metrics are registered dynamically
     val currentErrorMetricCount = errorMetricCount
     assertEquals(startErrorMetricCount + 1, currentErrorMetricCount)
-    assertTrue(currentErrorMetricCount < 10, s"Too many error metrics 
$currentErrorMetricCount")
+    assertTrue(currentErrorMetricCount < 14, s"Too many error metrics 
$currentErrorMetricCount")
 
     try {
       consumer.partitionsFor("non-existing-topic")
@@ -300,16 +280,6 @@ class MetricsTest extends IntegrationTestHarness with 
SaslSetup {
     }
   }
 
-  private def yammerHistogram(name: String): Histogram = {
-    val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
-    val (_, metric) = allMetrics.find { case (n, _) => 
n.getMBeanName.endsWith(name) }
-      .getOrElse(fail(s"Unable to find broker metric $name: allMetrics: 
${allMetrics.keySet.map(_.getMBeanName)}"))
-    metric match {
-      case m: Histogram => m
-      case m => throw new AssertionError(s"Unexpected broker metric of class 
${m.getClass}")
-    }
-  }
-
   private def verifyYammerMetricRecorded(name: String, verify: Double => 
Boolean = d => d > 0): Double = {
     val metricValue = yammerMetricValue(name).asInstanceOf[Double]
     assertTrue(verify(metricValue), s"Broker metric not recorded correctly for 
$name value $metricValue")

Reply via email to