This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new beb790e256d MINOR: Convert kafka.api.MetricsTest to KRaft (#17744)
beb790e256d is described below
commit beb790e256db15174757c57f4288560be9c5c077
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Sun Nov 10 19:54:26 2024 -0800
MINOR: Convert kafka.api.MetricsTest to KRaft (#17744)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../scala/integration/kafka/api/MetricsTest.scala | 74 +++++++---------------
1 file changed, 22 insertions(+), 52 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index baf0e4bd453..71d2764aee8 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -14,17 +14,17 @@ package kafka.api
import com.yammer.metrics.core.{Gauge, Histogram, Meter}
import kafka.security.JaasTestUtils
-import kafka.server.KafkaServer
+import kafka.server.KafkaBroker
import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
-import org.apache.kafka.common.config.{SaslConfigs, TopicConfig}
+import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.errors.{InvalidTopicException,
UnknownTopicOrPartitionException}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.authenticator.TestJaasConfig
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}
-import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs,
ZkConfigs}
+import org.apache.kafka.server.config.ServerLogConfigs
import
org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager,
NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteStorageMetrics}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.junit.jupiter.api.Assertions._
@@ -32,8 +32,8 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource
+
import java.util.{Locale, Properties}
-import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
class MetricsTest extends IntegrationTestHarness with SaslSetup {
@@ -45,9 +45,7 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
private val kafkaServerJaasEntryName =
s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME}"
- this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG,
"false")
this.serverConfig.setProperty(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG,
"false")
-
this.serverConfig.setProperty(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG,
"2.8")
this.producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10")
// intentionally slow message down conversion via gzip compression to ensure
we can measure the time it takes
this.producerConfig.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG,
"gzip")
@@ -65,6 +63,7 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
classOf[NoOpRemoteStorageManager].getName)
this.serverConfig.setProperty(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
classOf[NoOpRemoteLogMetadataManager].getName)
}
+ this.consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic")
verifyNoRequestMetrics("Request metrics not removed in a previous test")
startSasl(jaasSections(kafkaServerSaslMechanisms,
Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
super.setUp(testInfo)
@@ -80,22 +79,21 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
/**
* Verifies some of the metrics of producer, consumer as well as server.
*/
- @nowarn("cat=deprecation")
- @ParameterizedTest(name =
"{displayName}.quorum={0}.groupProtocol={1}.systemRemoteStorageEnabled={2}")
- @CsvSource(Array("zk,classic,true", "zk,classic,false"))
- def testMetrics(quorum: String, groupProtocol: String,
systemRemoteStorageEnabled: Boolean): Unit = {
- val topic = "topicWithOldMessageFormat"
- val props = new Properties
- props.setProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG, "0.9.0")
- createTopic(topic, numPartitions = 1, replicationFactor = 1, props)
+ @ParameterizedTest(name = "testMetrics with systemRemoteStorageEnabled: {1}")
+ @CsvSource(Array("kraft, true", "kraft, false"))
+ def testMetrics(quorum: String, systemRemoteStorageEnabled: Boolean): Unit =
{
+ val topic = "mytopic"
+ createTopic(topic,
+ numPartitions = 1,
+ replicationFactor = 1,
+ listenerName = interBrokerListenerName,
+ adminClientConfig = adminClientConfig)
val tp = new TopicPartition(topic, 0)
// Produce and consume some records
val numRecords = 10
val recordSize = 100000
val prop = new Properties()
- // idempotence producer doesn't support old version of messages
- prop.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false")
val producer = createProducer(configOverrides = prop)
sendRecords(producer, numRecords, recordSize, tp)
@@ -108,10 +106,9 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
verifyClientVersionMetrics(consumer.metrics, "Consumer")
verifyClientVersionMetrics(producer.metrics, "Producer")
- val server = servers.head
- verifyBrokerMessageConversionMetrics(server, recordSize, tp)
- verifyBrokerErrorMetrics(servers.head)
- verifyBrokerZkMetrics(server, topic)
+ val server = brokers.head
+ verifyBrokerMessageMetrics(server, recordSize, tp)
+ verifyBrokerErrorMetrics(server)
generateAuthenticationFailure(tp)
verifyBrokerAuthenticationMetrics(server)
@@ -195,7 +192,7 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
}
}
- private def verifyBrokerAuthenticationMetrics(server: KafkaServer): Unit = {
+ private def verifyBrokerAuthenticationMetrics(server: KafkaBroker): Unit = {
val metrics = server.metrics.metrics
TestUtils.waitUntilTrue(() =>
maxKafkaMetricValue("failed-authentication-total", metrics, "Broker",
Some("socket-server-metrics")) > 0,
@@ -206,15 +203,12 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
verifyKafkaMetricRecorded("failed-authentication-total", metrics,
"Broker", Some("socket-server-metrics"))
}
- private def verifyBrokerMessageConversionMetrics(server: KafkaServer,
recordSize: Int, tp: TopicPartition): Unit = {
+ private def verifyBrokerMessageMetrics(server: KafkaBroker, recordSize: Int,
tp: TopicPartition): Unit = {
val requestMetricsPrefix = "kafka.network:type=RequestMetrics"
val requestBytes =
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Produce")
val tempBytes =
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Produce")
assertTrue(tempBytes >= recordSize, s"Unexpected temporary memory size
requestBytes $requestBytes tempBytes $tempBytes")
-
verifyYammerMetricRecorded(s"kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec")
- // if message conversion run too fast, the Math.round(value) may be 0.0,
so using count to check whether the metric is updated
-
assertTrue(yammerHistogram(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce").count()
> 0, "MessageConversionsTimeMs count should be > 0")
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Fetch")
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch",
value => value == 0.0)
@@ -222,21 +216,7 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Metadata")
}
- private def verifyBrokerZkMetrics(server: KafkaServer, topic: String): Unit
= {
- val histogram =
yammerHistogram("kafka.server:type=ZooKeeperClientMetrics,name=ZooKeeperRequestLatencyMs")
- // Latency is rounded to milliseconds, so check the count instead
- val initialCount = histogram.count
- servers.head.zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
- val newCount = histogram.count
- assertTrue(newCount > initialCount, "ZooKeeper latency not recorded")
-
- val min = histogram.min
- assertTrue(min >= 0, s"Min latency should not be negative: $min")
-
- assertEquals("CONNECTED", yammerMetricValue("SessionState"), s"Unexpected
ZK state")
- }
-
- private def verifyBrokerErrorMetrics(server: KafkaServer): Unit = {
+ private def verifyBrokerErrorMetrics(server: KafkaBroker): Unit = {
def errorMetricCount =
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala.count(_.getName ==
"ErrorsPerSec")
@@ -255,7 +235,7 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
// Check that error metrics are registered dynamically
val currentErrorMetricCount = errorMetricCount
assertEquals(startErrorMetricCount + 1, currentErrorMetricCount)
- assertTrue(currentErrorMetricCount < 10, s"Too many error metrics
$currentErrorMetricCount")
+ assertTrue(currentErrorMetricCount < 14, s"Too many error metrics
$currentErrorMetricCount")
try {
consumer.partitionsFor("non-existing-topic")
@@ -300,16 +280,6 @@ class MetricsTest extends IntegrationTestHarness with
SaslSetup {
}
}
- private def yammerHistogram(name: String): Histogram = {
- val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
- val (_, metric) = allMetrics.find { case (n, _) =>
n.getMBeanName.endsWith(name) }
- .getOrElse(fail(s"Unable to find broker metric $name: allMetrics:
${allMetrics.keySet.map(_.getMBeanName)}"))
- metric match {
- case m: Histogram => m
- case m => throw new AssertionError(s"Unexpected broker metric of class
${m.getClass}")
- }
- }
-
private def verifyYammerMetricRecorded(name: String, verify: Double =>
Boolean = d => d > 0): Double = {
val metricValue = yammerMetricValue(name).asInstanceOf[Double]
assertTrue(verify(metricValue), s"Broker metric not recorded correctly for
$name value $metricValue")