lucasbru commented on code in PR #15216: URL: https://github.com/apache/kafka/pull/15216#discussion_r1472657940
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ########## @@ -108,6 +110,7 @@ public class HeartbeatRequestManager implements RequestManager { * sending heartbeat until the next poll. */ private final Timer pollTimer; + private final HeartbeatMetricsManager metricsManager; Review Comment: nit: I'd either group this with the field below or separate this from `pollTimer` with a newline. Looks like the javadoc is referring to both fields like this. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ########## @@ -139,7 +139,6 @@ public void testPollEnsureManualCommitSent() { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0)); commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), false); - assertPoll(1, commitRequestManger); Review Comment: Did you remove this on purpose? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -550,22 +558,64 @@ public void testPollTimerExpiration() { when(membershipManager.state()).thenReturn(MemberState.STABLE); time.sleep(maxPollIntervalMs); - NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(1, pollResult.unsentRequests.size()); + assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs); verify(heartbeatState).reset(); verify(heartbeatRequestState).reset(); verify(membershipManager).transitionToStale(); assertNoHeartbeat(heartbeatRequestManager); heartbeatRequestManager.resetPollTimer(time.milliseconds()); assertTrue(pollTimer.notExpired()); - assertHeartbeat(heartbeatRequestManager); + assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs); } - private void assertHeartbeat(HeartbeatRequestManager hrm) { + @Test + public void testHeartbeatMetrics() { + // setup + coordinatorRequestManager = mock(CoordinatorRequestManager.class); + membershipManager = mock(MembershipManager.class); + heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); + time = new MockTime(); + metrics = new Metrics(time); + heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState( + new LogContext(), + time, + 0, // This initial interval should be 0 to ensure heartbeat on the clock + retryBackoffMs, + retryBackoffMaxMs, + 0); + backgroundEventHandler = mock(BackgroundEventHandler.class); + heartbeatRequestManager = createHeartbeatRequestManager( + coordinatorRequestManager, + membershipManager, + heartbeatState, + heartbeatRequestState, + backgroundEventHandler); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); + when(membershipManager.state()).thenReturn(MemberState.STABLE); + + assertNotNull(getMetric("heartbeat-response-time-max")); + assertNotNull(getMetric("heartbeat-rate")); + assertNotNull(getMetric("heartbeat-total")); + assertNotNull(getMetric("last-heartbeat-seconds-ago")); + + // test poll + assertHeartbeat(heartbeatRequestManager, 0); + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); Review Comment: The mixed use of `DEFAULT_HEARTBEAT_INTERVAL_MS` and `heartbeatIntervalMs` is confusing. Can use `heartbeatIntervalMs` consistently (if we need the extra alias) or just remove the alias and always use `DEFAULT_HEARTBEAT_INTERVAL_MS` ? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetricsManagerTest.java: ########## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager; +import org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics; +import org.apache.kafka.clients.consumer.internals.metrics.OffsetCommitMetricsManager; +import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.Test; + +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class ConsumerMetricsManagerTest { Review Comment: If you think it's worth it splitting each group of metrics in a separate class, I would also split the corresponding unit tests into separate classes in the same `metrics` package, instead of having a combined unit test for all classes. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -671,18 +721,8 @@ private ConsumerConfig config() { return new ConsumerConfig(prop); } - private HeartbeatRequestManager createHeartbeatRequestManager() { - LogContext logContext = new LogContext(); - pollTimer = time.timer(maxPollIntervalMs); - return new HeartbeatRequestManager( - logContext, - pollTimer, - config(), - coordinatorRequestManager, - membershipManager, - heartbeatState, - heartbeatRequestState, - backgroundEventHandler); + private KafkaMetric getMetric(final String name) { + return metrics.metrics().get(metrics.metricName(name, metricGroupPrefix + COORDINATOR_METRICS_SUFFIX)); Review Comment: I'd suggest using the full metric name (including literal suffix) in these tests, like they appear in the documentation, and not reuse the constants from the prod code (otherwise we won't catch if those constants get modified incorrectly). ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -88,7 +82,7 @@ public class CommitRequestManager implements RequestManager, MemberStateListener private final boolean throwOnFetchStableOffsetUnsupported; final PendingRequests pendingRequests; private boolean closing = false; - private Sensor commitSensor; + private OffsetCommitMetricsManager metricsManager; Review Comment: final? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AbstractConsumerMetricsManager.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.metrics; + +import org.apache.kafka.common.metrics.Measurable; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.stats.Meter; +import org.apache.kafka.common.metrics.stats.WindowedCount; + +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX; + +/** + * Base class for different consumer metrics to extend. This class helps to construct the logical group name from the + * given prefix and suffix, and provides a few common utilities. + * + * <p> + * The suffix can be one of the following: + * <ul> + * <li><code>-coordinator-metrics</code>: {@link MetricGroupSuffix#COORDINATOR}</li> + * <li><code>-metrics</code>: {@link MetricGroupSuffix#CONSUMER}</li> + * </ul> + * </p> + */ +public abstract class AbstractConsumerMetricsManager { Review Comment: You don't have to change this, but personally, I don't think we need subclassing here. It seems all we get from it is a bunch of utilities, that could just be imported. Subclassing should be avoided if not necessary, and it creates complications (see, e.g. the current build failure). ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java: ########## @@ -550,22 +558,64 @@ public void testPollTimerExpiration() { when(membershipManager.state()).thenReturn(MemberState.STABLE); time.sleep(maxPollIntervalMs); - NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); - assertEquals(1, pollResult.unsentRequests.size()); + assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs); verify(heartbeatState).reset(); verify(heartbeatRequestState).reset(); verify(membershipManager).transitionToStale(); assertNoHeartbeat(heartbeatRequestManager); heartbeatRequestManager.resetPollTimer(time.milliseconds()); assertTrue(pollTimer.notExpired()); - assertHeartbeat(heartbeatRequestManager); + assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs); } - private void assertHeartbeat(HeartbeatRequestManager hrm) { + @Test + public void testHeartbeatMetrics() { + // setup + coordinatorRequestManager = mock(CoordinatorRequestManager.class); + membershipManager = mock(MembershipManager.class); + heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); + time = new MockTime(); + metrics = new Metrics(time); + heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState( + new LogContext(), + time, + 0, // This initial interval should be 0 to ensure heartbeat on the clock + retryBackoffMs, + retryBackoffMaxMs, + 0); + backgroundEventHandler = mock(BackgroundEventHandler.class); + heartbeatRequestManager = createHeartbeatRequestManager( + coordinatorRequestManager, + membershipManager, + heartbeatState, + heartbeatRequestState, + backgroundEventHandler); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); + when(membershipManager.state()).thenReturn(MemberState.STABLE); + + assertNotNull(getMetric("heartbeat-response-time-max")); + assertNotNull(getMetric("heartbeat-rate")); + assertNotNull(getMetric("heartbeat-total")); + assertNotNull(getMetric("last-heartbeat-seconds-ago")); + + // test poll + assertHeartbeat(heartbeatRequestManager, 0); + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); + assertEquals(1.0, getMetric("heartbeat-total").metricValue()); + assertEquals((double) TimeUnit.MILLISECONDS.toSeconds(heartbeatIntervalMs), getMetric("last-heartbeat-seconds-ago").metricValue()); + + assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); + assertEquals(0.06d, (double) getMetric("heartbeat-rate").metricValue(), 0.005d); + assertEquals(2.0, getMetric("heartbeat-total").metricValue()); + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); Review Comment: I think here we can test some other value, since we don't really need this to be the heartbeat interval, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org