[ 
https://issues.apache.org/jira/browse/KAFKA-7261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16577689#comment-16577689
 ] 

ASF GitHub Bot commented on KAFKA-7261:
---------------------------------------

rajinisivaram closed pull request #5484: KAFKA-7261: Fix request total metric 
to count requests instead of bytes
URL: https://github.com/apache/kafka/pull/5484
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
index 09263cecae8..91d4461d2b5 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Meter.java
@@ -61,6 +61,9 @@ public Meter(SampledStat rateStat, MetricName rateMetricName, 
MetricName totalMe
      * Construct a Meter with provided time unit and provided {@link 
SampledStat} stats for Rate
      */
     public Meter(TimeUnit unit, SampledStat rateStat, MetricName 
rateMetricName, MetricName totalMetricName) {
+        if (!(rateStat instanceof SampledTotal) && !(rateStat instanceof 
Count)) {
+            throw new IllegalArgumentException("Meter is supported only for 
SampledTotal and Count");
+        }
         this.total = new Total();
         this.rate = new Rate(unit, rateStat);
         this.rateMetricName = rateMetricName;
@@ -77,6 +80,8 @@ public Meter(TimeUnit unit, SampledStat rateStat, MetricName 
rateMetricName, Met
     @Override
     public void record(MetricConfig config, double value, long timeMs) {
         rate.record(config, value, timeMs);
-        total.record(config, value, timeMs);
+        // Total metrics with Count stat should record 1.0 (as recorded in the 
count)
+        double totalValue = (rate.stat instanceof Count) ? 1.0 : value;
+        total.record(config, totalValue, timeMs);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java 
b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
index 355233125bc..55354ac8d64 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -146,4 +146,12 @@ public ByteBuffer payload() {
         return this.buffer;
     }
 
+    /**
+     * Returns the total size of the receive including payload and size buffer
+     * for use in metrics. This is consistent with {@link NetworkSend#size()}
+     */
+    public int size() {
+        return payload().limit() + size.limit();
+    }
+
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 8ca7fff381a..7e32509933e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -862,7 +862,7 @@ private void addToCompletedReceives() {
     private void addToCompletedReceives(KafkaChannel channel, 
Deque<NetworkReceive> stagedDeque) {
         NetworkReceive networkReceive = stagedDeque.poll();
         this.completedReceives.add(networkReceive);
-        this.sensors.recordBytesReceived(channel.id(), 
networkReceive.payload().limit());
+        this.sensors.recordBytesReceived(channel.id(), networkReceive.size());
     }
 
     // only for testing
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 59bc84e40de..5c75d03b0e6 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -465,8 +465,12 @@ public void testRateWindowing() throws Exception {
         Sensor s = metrics.sensor("test.sensor", cfg);
         MetricName rateMetricName = metrics.metricName("test.rate", "grp1");
         MetricName totalMetricName = metrics.metricName("test.total", "grp1");
+        MetricName countRateMetricName = metrics.metricName("test.count.rate", 
"grp1");
+        MetricName countTotalMetricName = 
metrics.metricName("test.count.total", "grp1");
         s.add(new Meter(TimeUnit.SECONDS, rateMetricName, totalMetricName));
-        KafkaMetric totalMetric = 
metrics.metrics().get(metrics.metricName("test.total", "grp1"));
+        s.add(new Meter(TimeUnit.SECONDS, new Count(), countRateMetricName, 
countTotalMetricName));
+        KafkaMetric totalMetric = metrics.metrics().get(totalMetricName);
+        KafkaMetric countTotalMetric = 
metrics.metrics().get(countTotalMetricName);
 
         int sum = 0;
         int count = cfg.samples() - 1;
@@ -484,11 +488,21 @@ public void testRateWindowing() throws Exception {
         // prior to any time passing
         double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + 
cfg.timeWindowMs() / 2) / 1000.0;
 
-        KafkaMetric rateMetric = 
metrics.metrics().get(metrics.metricName("test.rate", "grp1"));
+        KafkaMetric rateMetric = metrics.metrics().get(rateMetricName);
+        KafkaMetric countRateMetric = 
metrics.metrics().get(countRateMetricName);
         assertEquals("Rate(0...2) = 2.666", sum / elapsedSecs, 
rateMetric.value(), EPS);
+        assertEquals("Count rate(0...2) = 0.02666", count / elapsedSecs, 
countRateMetric.value(), EPS);
         assertEquals("Elapsed Time = 75 seconds", elapsedSecs,
                 ((Rate) rateMetric.measurable()).windowSize(cfg, 
time.milliseconds()) / 1000, EPS);
         assertEquals(sum, totalMetric.value(), EPS);
+        assertEquals(count, countTotalMetric.value(), EPS);
+
+        // Verify that rates are expired, but total is cumulative
+        time.sleep(cfg.timeWindowMs() * cfg.samples());
+        assertEquals(0, rateMetric.value(), EPS);
+        assertEquals(0, countRateMetric.value(), EPS);
+        assertEquals(sum, totalMetric.value(), EPS);
+        assertEquals(count, countTotalMetric.value(), EPS);
     }
 
     public static class ConstantMeasurable implements Measurable {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java 
b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 53f9d95a55a..64b7e4e6792 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -120,7 +120,7 @@ public void verifyAuthenticationMetrics(int 
successfulAuthentications, final int
         waitForMetric("failed-authentication", failedAuthentications);
     }
 
-    private void waitForMetric(String name, final double expectedValue) throws 
InterruptedException {
+    public void waitForMetric(String name, final double expectedValue) throws 
InterruptedException {
         final String totalName = name + "-total";
         final String rateName = name + "-rate";
         if (expectedValue == 0.0) {
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 6aef2f7eda6..d70a448df22 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -556,6 +556,27 @@ public void testUnsupportedCiphers() throws Exception {
         server.verifyAuthenticationMetrics(0, 1);
     }
 
+    @Test
+    public void testServerRequestMetrics() throws Exception {
+        String node = "0";
+        server = createEchoServer(SecurityProtocol.SSL);
+        createSelector(sslClientConfigs, 16384, 16384, 16384);
+        InetSocketAddress addr = new InetSocketAddress("localhost", 
server.port());
+        selector.connect(node, addr, 102400, 102400);
+        NetworkTestUtils.waitForChannelReady(selector, node);
+        int messageSize = 1024 * 1024;
+        String message = TestUtils.randomString(messageSize);
+        selector.send(new NetworkSend(node, 
ByteBuffer.wrap(message.getBytes())));
+        while (selector.completedReceives().isEmpty()) {
+            selector.poll(100L);
+        }
+        int totalBytes = messageSize + 4; // including 4-byte size
+        server.waitForMetric("incoming-byte", totalBytes);
+        server.waitForMetric("outgoing-byte", totalBytes);
+        server.waitForMetric("request", 1);
+        server.waitForMetric("response", 1);
+    }
+
     /**
      * selector.poll() should be able to fetch more data than netReadBuffer 
from the socket.
      */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Request and response total metrics record bytes instead of request count
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-7261
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7261
>             Project: Kafka
>          Issue Type: Bug
>          Components: metrics
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>
> Request and response total metrics seem to be recording total bytes rather 
> than total requests since they record using a common sensor.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to