This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 2bf0689 MINOR: Use large batches in metrics test for conversion time >= 1ms (#4681) 2bf0689 is described below commit 2bf06890b9af62b0aa0ed0debc1b36232afe463a Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Tue Mar 13 06:23:00 2018 +0000 MINOR: Use large batches in metrics test for conversion time >= 1ms (#4681) --- .../test/scala/integration/kafka/api/MetricsTest.scala | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala index 80cfeca..baadd66 100644 --- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala +++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala @@ -43,6 +43,8 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { s"${listenerName.value.toLowerCase(Locale.ROOT)}.${JaasTestUtils.KafkaServerContextName}" this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "false") this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableDoc, "false") + this.producerConfig.setProperty(ProducerConfig.LINGER_MS_CONFIG, "10") + this.producerConfig.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "1000000") override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) @@ -90,7 +92,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { verifyClientVersionMetrics(this.producers.head.metrics, "Producer") val server = servers.head - verifyBrokerMessageConversionMetrics(server, recordSize) + verifyBrokerMessageConversionMetrics(server, recordSize, tp) verifyBrokerErrorMetrics(servers.head) verifyBrokerZkMetrics(server, topic) @@ -187,7 +189,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { verifyKafkaMetricRecorded("failed-authentication-total", metrics, "Broker", Some("socket-server-metrics")) } - private def verifyBrokerMessageConversionMetrics(server: KafkaServer, recordSize: Int): Unit = { + private def verifyBrokerMessageConversionMetrics(server: KafkaServer, recordSize: Int, tp: TopicPartition): Unit = { val requestMetricsPrefix = "kafka.network:type=RequestMetrics" val requestBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Produce") val tempBytes = verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Produce") @@ -195,7 +197,17 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup { tempBytes >= recordSize) verifyYammerMetricRecorded(s"kafka.server:type=BrokerTopicMetrics,name=ProduceMessageConversionsPerSec") - verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce", value => value > 0.0) + + // Conversion time less than 1 millisecond is reported as zero, so retry with larger batches until time > 0 + var iteration = 0 + TestUtils.retry(5000) { + val conversionTimeMs = yammerMetricValue(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce").asInstanceOf[Double] + if (conversionTimeMs <= 0.0) { + iteration += 1 + sendRecords(producers.head, 1000 * iteration, 100, tp) + } + assertTrue(s"Message conversion time not recorded $conversionTimeMs", conversionTimeMs > 0.0) + } verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Fetch") verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch", value => value == 0.0) -- To stop receiving notification emails like this one, please contact j...@apache.org.