[ https://issues.apache.org/jira/browse/KAFKA-7261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577689#comment-16577689 ]
ASF GitHub Bot commented on KAFKA-7261: --------------------------------------- rajinisivaram closed pull request #5484: KAFKA-7261: Fix request total metric to count requests instead of bytes URL: https://github.com/apache/kafka/pull/5484 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java index 09263cecae8..91d4461d2b5 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java @@ -61,6 +61,9 @@ public Meter(SampledStat rateStat, MetricName rateMetricName, MetricName totalMe * Construct a Meter with provided time unit and provided {@link SampledStat} stats for Rate */ public Meter(TimeUnit unit, SampledStat rateStat, MetricName rateMetricName, MetricName totalMetricName) { + if (!(rateStat instanceof SampledTotal) && !(rateStat instanceof Count)) { + throw new IllegalArgumentException("Meter is supported only for SampledTotal and Count"); + } this.total = new Total(); this.rate = new Rate(unit, rateStat); this.rateMetricName = rateMetricName; @@ -77,6 +80,8 @@ public Meter(TimeUnit unit, SampledStat rateStat, MetricName rateMetricName, Met @Override public void record(MetricConfig config, double value, long timeMs) { rate.record(config, value, timeMs); - total.record(config, value, timeMs); + // Total metrics with Count stat should record 1.0 (as recorded in the count) + double totalValue = (rate.stat instanceof Count) ? 1.0 : value; + total.record(config, totalValue, timeMs); } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java index 355233125bc..55354ac8d64 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java +++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java @@ -146,4 +146,12 @@ public ByteBuffer payload() { return this.buffer; } + /** + * Returns the total size of the receive including payload and size buffer + * for use in metrics. This is consistent with {@link NetworkSend#size()} + */ + public int size() { + return payload().limit() + size.limit(); + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 8ca7fff381a..7e32509933e 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -862,7 +862,7 @@ private void addToCompletedReceives() { private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) { NetworkReceive networkReceive = stagedDeque.poll(); this.completedReceives.add(networkReceive); - this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit()); + this.sensors.recordBytesReceived(channel.id(), networkReceive.size()); } // only for testing diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 59bc84e40de..5c75d03b0e6 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -465,8 +465,12 @@ public void testRateWindowing() throws Exception { Sensor s = metrics.sensor("test.sensor", cfg); MetricName rateMetricName = metrics.metricName("test.rate", "grp1"); MetricName totalMetricName = metrics.metricName("test.total", "grp1"); + MetricName countRateMetricName = metrics.metricName("test.count.rate", "grp1"); + MetricName countTotalMetricName = metrics.metricName("test.count.total", "grp1"); s.add(new Meter(TimeUnit.SECONDS, rateMetricName, totalMetricName)); - KafkaMetric totalMetric = metrics.metrics().get(metrics.metricName("test.total", "grp1")); + s.add(new Meter(TimeUnit.SECONDS, new Count(), countRateMetricName, countTotalMetricName)); + KafkaMetric totalMetric = metrics.metrics().get(totalMetricName); + KafkaMetric countTotalMetric = metrics.metrics().get(countTotalMetricName); int sum = 0; int count = cfg.samples() - 1; @@ -484,11 +488,21 @@ public void testRateWindowing() throws Exception { // prior to any time passing double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0; - KafkaMetric rateMetric = metrics.metrics().get(metrics.metricName("test.rate", "grp1")); + KafkaMetric rateMetric = metrics.metrics().get(rateMetricName); + KafkaMetric countRateMetric = metrics.metrics().get(countRateMetricName); assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, rateMetric.value(), EPS); + assertEquals("Count rate(0...2) = 0.02666", count / elapsedSecs, countRateMetric.value(), EPS); assertEquals("Elapsed Time = 75 seconds", elapsedSecs, ((Rate) rateMetric.measurable()).windowSize(cfg, time.milliseconds()) / 1000, EPS); assertEquals(sum, totalMetric.value(), EPS); + assertEquals(count, countTotalMetric.value(), EPS); + + // Verify that rates are expired, but total is cumulative + time.sleep(cfg.timeWindowMs() * cfg.samples()); + assertEquals(0, rateMetric.value(), EPS); + assertEquals(0, countRateMetric.value(), EPS); + assertEquals(sum, totalMetric.value(), EPS); + assertEquals(count, countTotalMetric.value(), EPS); } public static class ConstantMeasurable implements Measurable { diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java index 53f9d95a55a..64b7e4e6792 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java +++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java @@ -120,7 +120,7 @@ public void verifyAuthenticationMetrics(int successfulAuthentications, final int waitForMetric("failed-authentication", failedAuthentications); } - private void waitForMetric(String name, final double expectedValue) throws InterruptedException { + public void waitForMetric(String name, final double expectedValue) throws InterruptedException { final String totalName = name + "-total"; final String rateName = name + "-rate"; if (expectedValue == 0.0) { diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index 6aef2f7eda6..d70a448df22 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -556,6 +556,27 @@ public void testUnsupportedCiphers() throws Exception { server.verifyAuthenticationMetrics(0, 1); } + @Test + public void testServerRequestMetrics() throws Exception { + String node = "0"; + server = createEchoServer(SecurityProtocol.SSL); + createSelector(sslClientConfigs, 16384, 16384, 16384); + InetSocketAddress addr = new InetSocketAddress("localhost", server.port()); + selector.connect(node, addr, 102400, 102400); + NetworkTestUtils.waitForChannelReady(selector, node); + int messageSize = 1024 * 1024; + String message = TestUtils.randomString(messageSize); + selector.send(new NetworkSend(node, ByteBuffer.wrap(message.getBytes()))); + while (selector.completedReceives().isEmpty()) { + selector.poll(100L); + } + int totalBytes = messageSize + 4; // including 4-byte size + server.waitForMetric("incoming-byte", totalBytes); + server.waitForMetric("outgoing-byte", totalBytes); + server.waitForMetric("request", 1); + server.waitForMetric("response", 1); + } + /** * selector.poll() should be able to fetch more data than netReadBuffer from the socket. */ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Request and response total metrics record bytes instead of request count > ------------------------------------------------------------------------ > > Key: KAFKA-7261 > URL: https://issues.apache.org/jira/browse/KAFKA-7261 > Project: Kafka > Issue Type: Bug > Components: metrics > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Major > > Request and response total metrics seem to be recording total bytes rather > than total requests since they record using a common sensor. -- This message was sent by Atlassian JIRA (v7.6.3#76005)