lucasbru commented on code in PR #15216:
URL: https://github.com/apache/kafka/pull/15216#discussion_r1472657940


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##########
@@ -108,6 +110,7 @@ public class HeartbeatRequestManager implements 
RequestManager {
      * sending heartbeat until the next poll.
      */
     private final Timer pollTimer;
+    private final HeartbeatMetricsManager metricsManager;

Review Comment:
   nit: I'd either group this with the field below or separate this from 
`pollTimer` with a newline. Looks like the javadoc is referring to both fields 
like this.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##########
@@ -139,7 +139,6 @@ public void testPollEnsureManualCommitSent() {
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
         commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), 
false);
-        assertPoll(1, commitRequestManger);

Review Comment:
   Did you remove this on purpose?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -550,22 +558,64 @@ public void testPollTimerExpiration() {
         when(membershipManager.state()).thenReturn(MemberState.STABLE);
 
         time.sleep(maxPollIntervalMs);
-        NetworkClientDelegate.PollResult pollResult = 
heartbeatRequestManager.poll(time.milliseconds());
-        assertEquals(1, pollResult.unsentRequests.size());
+        assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs);
         verify(heartbeatState).reset();
         verify(heartbeatRequestState).reset();
         verify(membershipManager).transitionToStale();
 
         assertNoHeartbeat(heartbeatRequestManager);
         heartbeatRequestManager.resetPollTimer(time.milliseconds());
         assertTrue(pollTimer.notExpired());
-        assertHeartbeat(heartbeatRequestManager);
+        assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs);
     }
 
-    private void assertHeartbeat(HeartbeatRequestManager hrm) {
+    @Test
+    public void testHeartbeatMetrics() {
+        // setup
+        coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+        membershipManager = mock(MembershipManager.class);
+        heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
+        time = new MockTime();
+        metrics = new Metrics(time);
+        heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
+            new LogContext(),
+            time,
+            0, // This initial interval should be 0 to ensure heartbeat on the 
clock
+            retryBackoffMs,
+            retryBackoffMaxMs,
+            0);
+        backgroundEventHandler = mock(BackgroundEventHandler.class);
+        heartbeatRequestManager = createHeartbeatRequestManager(
+            coordinatorRequestManager,
+            membershipManager,
+            heartbeatState,
+            heartbeatRequestState,
+            backgroundEventHandler);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", 9999)));
+        when(membershipManager.state()).thenReturn(MemberState.STABLE);
+
+        assertNotNull(getMetric("heartbeat-response-time-max"));
+        assertNotNull(getMetric("heartbeat-rate"));
+        assertNotNull(getMetric("heartbeat-total"));
+        assertNotNull(getMetric("last-heartbeat-seconds-ago"));
+
+        // test poll
+        assertHeartbeat(heartbeatRequestManager, 0);
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);

Review Comment:
   The mixed use of `DEFAULT_HEARTBEAT_INTERVAL_MS` and `heartbeatIntervalMs` 
is confusing. Can use `heartbeatIntervalMs` consistently (if we need the extra 
alias) or just remove the alias and always use `DEFAULT_HEARTBEAT_INTERVAL_MS` ?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetricsManagerTest.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
+import 
org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
+import 
org.apache.kafka.clients.consumer.internals.metrics.OffsetCommitMetricsManager;
+import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.Test;
+
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+public class ConsumerMetricsManagerTest {

Review Comment:
   If you think it's worth it splitting each group of metrics in a separate 
class, I would also split the corresponding unit tests into separate classes in 
the same `metrics` package, instead of having a combined unit test for all 
classes.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -671,18 +721,8 @@ private ConsumerConfig config() {
         return new ConsumerConfig(prop);
     }
 
-    private HeartbeatRequestManager createHeartbeatRequestManager() {
-        LogContext logContext = new LogContext();
-        pollTimer = time.timer(maxPollIntervalMs);
-        return new HeartbeatRequestManager(
-            logContext,
-            pollTimer,
-            config(),
-            coordinatorRequestManager,
-            membershipManager,
-            heartbeatState,
-            heartbeatRequestState,
-            backgroundEventHandler);
+    private KafkaMetric getMetric(final String name) {
+        return metrics.metrics().get(metrics.metricName(name, 
metricGroupPrefix + COORDINATOR_METRICS_SUFFIX));

Review Comment:
   I'd suggest using the full metric name (including literal suffix) in these 
tests, like they appear in the documentation, and not reuse the constants from 
the prod code (otherwise we won't catch if those constants get modified 
incorrectly).



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -88,7 +82,7 @@ public class CommitRequestManager implements RequestManager, 
MemberStateListener
     private final boolean throwOnFetchStableOffsetUnsupported;
     final PendingRequests pendingRequests;
     private boolean closing = false;
-    private Sensor commitSensor;
+    private OffsetCommitMetricsManager metricsManager;

Review Comment:
   final?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
+
+/**
+ * Base class for different consumer metrics to extend. This class helps to 
construct the logical group name from the
+ * given prefix and suffix, and provides a few common utilities.
+ *
+ * <p>
+ * The suffix can be one of the following:
+ * <ul>
+ *     <li><code>-coordinator-metrics</code>: {@link 
MetricGroupSuffix#COORDINATOR}</li>
+ *     <li><code>-metrics</code>: {@link MetricGroupSuffix#CONSUMER}</li>
+ * </ul>
+ * </p>
+ */
+public abstract class AbstractConsumerMetricsManager {

Review Comment:
   You don't have to change this, but personally, I don't think we need 
subclassing here. It seems all we get from it is a bunch of utilities, that 
could just be imported. Subclassing should be avoided if not necessary, and it 
creates complications (see, e.g. the current build failure).



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java:
##########
@@ -550,22 +558,64 @@ public void testPollTimerExpiration() {
         when(membershipManager.state()).thenReturn(MemberState.STABLE);
 
         time.sleep(maxPollIntervalMs);
-        NetworkClientDelegate.PollResult pollResult = 
heartbeatRequestManager.poll(time.milliseconds());
-        assertEquals(1, pollResult.unsentRequests.size());
+        assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs);
         verify(heartbeatState).reset();
         verify(heartbeatRequestState).reset();
         verify(membershipManager).transitionToStale();
 
         assertNoHeartbeat(heartbeatRequestManager);
         heartbeatRequestManager.resetPollTimer(time.milliseconds());
         assertTrue(pollTimer.notExpired());
-        assertHeartbeat(heartbeatRequestManager);
+        assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs);
     }
 
-    private void assertHeartbeat(HeartbeatRequestManager hrm) {
+    @Test
+    public void testHeartbeatMetrics() {
+        // setup
+        coordinatorRequestManager = mock(CoordinatorRequestManager.class);
+        membershipManager = mock(MembershipManager.class);
+        heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class);
+        time = new MockTime();
+        metrics = new Metrics(time);
+        heartbeatRequestState = new 
HeartbeatRequestManager.HeartbeatRequestState(
+            new LogContext(),
+            time,
+            0, // This initial interval should be 0 to ensure heartbeat on the 
clock
+            retryBackoffMs,
+            retryBackoffMaxMs,
+            0);
+        backgroundEventHandler = mock(BackgroundEventHandler.class);
+        heartbeatRequestManager = createHeartbeatRequestManager(
+            coordinatorRequestManager,
+            membershipManager,
+            heartbeatState,
+            heartbeatRequestState,
+            backgroundEventHandler);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new 
Node(1, "localhost", 9999)));
+        when(membershipManager.state()).thenReturn(MemberState.STABLE);
+
+        assertNotNull(getMetric("heartbeat-response-time-max"));
+        assertNotNull(getMetric("heartbeat-rate"));
+        assertNotNull(getMetric("heartbeat-total"));
+        assertNotNull(getMetric("last-heartbeat-seconds-ago"));
+
+        // test poll
+        assertHeartbeat(heartbeatRequestManager, 0);
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+        assertEquals(1.0, getMetric("heartbeat-total").metricValue());
+        assertEquals((double) 
TimeUnit.MILLISECONDS.toSeconds(heartbeatIntervalMs), 
getMetric("last-heartbeat-seconds-ago").metricValue());
+
+        assertHeartbeat(heartbeatRequestManager, 
DEFAULT_HEARTBEAT_INTERVAL_MS);
+        assertEquals(0.06d, (double) 
getMetric("heartbeat-rate").metricValue(), 0.005d);
+        assertEquals(2.0, getMetric("heartbeat-total").metricValue());
+        time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);

Review Comment:
   I think here we can test some other value, since we don't really need this 
to be the heartbeat interval, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to