[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-19 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1268717074


##
metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java:
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * These are the metrics which are managed by the SnapshotEmitter class.
+ */
+public final class SnapshotEmitterMetrics implements AutoCloseable {
+private final static MetricName LATEST_SNAPSHOT_GENERATED_BYTES = 
getMetricName(
+"SnapshotEmitter", "LatestSnapshotGeneratedBytes");
+private final static MetricName LATEST_SNAPSHOT_GENERATED_AGE_MS = 
getMetricName(
+"SnapshotEmitter", "LatestSnapshotGeneratedAgeMs");
+
+private final Optional registry;
+private final Time time;
+private final AtomicLong latestSnapshotGeneratedBytes;
+private final AtomicLong latestSnapshotGeneratedTimeMs;
+
+/**
+ * Create a new LoaderMetrics object.
+ *
+ * @param registry  The metrics registry, or Optional.empty if this is a 
test and we don't have one.
+ */
+public SnapshotEmitterMetrics(
+Optional registry,
+Time time,
+long initialLatestSnapshotGeneratedBytes
+) {
+this.registry = registry;
+this.time = time;
+this.latestSnapshotGeneratedBytes = new 
AtomicLong(initialLatestSnapshotGeneratedBytes);
+this.latestSnapshotGeneratedTimeMs = new AtomicLong(monoTimeInMs());
+registry.ifPresent(r -> r.newGauge(LATEST_SNAPSHOT_GENERATED_BYTES, 
new Gauge() {
+@Override
+public Long value() {
+return latestSnapshotGeneratedBytes();
+}
+}));
+registry.ifPresent(r -> r.newGauge(LATEST_SNAPSHOT_GENERATED_AGE_MS, 
new Gauge() {
+@Override
+public Long value() {
+return latestSnapshotGeneratedAgeMs();
+}
+}));
+}
+
+long monoTimeInMs() {
+return TimeUnit.NANOSECONDS.toMillis(time.nanoseconds());
+}
+
+public void setLatestSnapshotGeneratedBytes(long value) {
+this.latestSnapshotGeneratedBytes.set(value);
+}

Review Comment:
   Fixed, and this is now tested.



##
metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java:
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * These are the metrics which are managed by the SnapshotEmitter class.
+ */
+public final class SnapshotEmitterMetrics implements AutoCloseable {


[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-19 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1268716163


##
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java:
##
@@ -54,6 +54,14 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
 "KafkaController", "LastAppliedRecordTimestamp");
 private final static MetricName LAST_APPLIED_RECORD_LAG_MS = getMetricName(
 "KafkaController", "LastAppliedRecordLagMs");

Review Comment:
   I'd prefer it this way since we're initializing a bunch of constants here, 
and sometimes we want to grep for stuff.
   
   It's kind of annoying when you have "matryoshka doll initialization" of 
constants.
   
   Like rather than having:
   ```
   static final String FOO = "foo";
   static final String BAR = "bar";
   static final String FOO_BAR = FOO + BAR;
   static final String FOO_BAR_BAZ = FOO_BAR + "baz";
   ```
   
   I'd rather just have:
   ```
   static final String FOO = "foo";
   static final String BAR = "bar";
   static final String FOO_BAR = "foobar";
   static final String FOO_BAR_BAZ = "foobarbaz";
   ```
   
   since then I can grep for the constant and it works as expected.
   
   Although maybe other people disagree?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267289475


##
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java:
##
@@ -176,17 +210,53 @@ public long dualWriteOffset() {
 }
 
 public void incrementTimedOutHeartbeats() {
-timedOutHeartbeats.addAndGet(1);
+timedOutHeartbeats.incrementAndGet();
 }
 
-public void setTimedOutHeartbeats(long heartbeats) {
-timedOutHeartbeats.set(heartbeats);
+public void setTimedOutHeartbeats(long value) {
+timedOutHeartbeats.set(value);
 }

Review Comment:
   Yeah, I guess we can remove these



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267289020


##
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java:
##
@@ -113,7 +124,30 @@ public Long value() {
 return time.milliseconds() - lastAppliedRecordTimestamp();
 }
 }));
-
+registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, 
new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();
+}
+}));
+registry.ifPresent(r -> 
r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();

Review Comment:
   added a test that should catch this



##
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java:
##
@@ -113,7 +124,30 @@ public Long value() {
 return time.milliseconds() - lastAppliedRecordTimestamp();
 }
 }));
-
+registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, 
new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();
+}
+}));
+registry.ifPresent(r -> 
r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();
+}
+}));
+registry.ifPresent(r -> 
r.newGauge(EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT, new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();

Review Comment:
   added a test that should catch this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267287803


##
metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java:
##
@@ -418,24 +415,30 @@ LogDeltaManifest loadLogDelta(
 public void handleLoadSnapshot(SnapshotReader 
reader) {
 eventQueue.append(() -> {
 try {
+long numLoaded = metrics.incrementHandleLoadSnapshotCount();
+String snapshotName = 
Snapshots.filenameFromSnapshotId(reader.snapshotId());
+log.info("handleLoadSnapshot({}): incrementing 
HandleLoadSnapshotCount to {}.",
+snapshotName, numLoaded);
 MetadataDelta delta = new MetadataDelta.Builder().
 setImage(image).
 build();
 SnapshotManifest manifest = loadSnapshot(delta, reader);
-log.info("handleLoadSnapshot: generated a metadata delta from 
a snapshot at offset {} " +
-"in {} us.", 
manifest.provenance().lastContainedOffset(),
+log.info("handleLoadSnapshot({}): generated a metadata delta 
between offset {} " +
+"and this snapshot in {} us.", snapshotName,
+image.provenance().lastContainedOffset(),
 NANOSECONDS.toMicros(manifest.elapsedNs()));
 try {
 image = delta.apply(manifest.provenance());
 } catch (Throwable e) {
 faultHandler.handleFault("Error generating new metadata 
image from " +
-"snapshot at offset " + 
reader.lastContainedLogOffset(), e);
+"snapshot " + snapshotName, e);

Review Comment:
   the offset is included in the snapshot name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267254133


##
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java:
##
@@ -113,7 +124,30 @@ public Long value() {
 return time.milliseconds() - lastAppliedRecordTimestamp();
 }
 }));
-
+registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, 
new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();
+}
+}));
+registry.ifPresent(r -> 
r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();
+}
+}));
+registry.ifPresent(r -> 
r.newGauge(EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT, new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();

Review Comment:
   good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267253682


##
metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java:
##
@@ -113,7 +124,30 @@ public Long value() {
 return time.milliseconds() - lastAppliedRecordTimestamp();
 }
 }));
-
+registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, 
new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();
+}
+}));
+registry.ifPresent(r -> 
r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge() {
+@Override
+public Long value() {
+return timedOutHeartbeats();

Review Comment:
   good catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267252934


##
core/src/main/scala/kafka/server/SharedServer.scala:
##
@@ -282,15 +294,15 @@ class SharedServer(
   setDisabledReason(snapshotsDisabledReason).
   setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
   build()
-_raftManager.register(loader)

Review Comment:
   Not really fixing a bug per se but it's probably slightly better for 
performance this way. Mainly, I did not want the `get()` call to be blocked 
behind another event.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267251882


##
core/src/main/scala/kafka/server/SharedServer.scala:
##
@@ -282,15 +294,15 @@ class SharedServer(
   setDisabledReason(snapshotsDisabledReason).
   setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
   build()
-_raftManager.register(loader)
 try {
-  
loader.installPublishers(Collections.singletonList(snapshotGenerator))
+  loader.installPublishers(util.Arrays.asList(snapshotGenerator)).get()

Review Comment:
   yes, although I think it's a bug that's very unlikely to cause harm in 
practice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267251209


##
core/src/main/scala/kafka/server/SharedServer.scala:
##
@@ -259,15 +262,24 @@ class SharedServer(
 raftManager = _raftManager
 _raftManager.startup()
 
+metadataLoaderMetrics = if (brokerMetrics != null) {
+  new 
MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
+elapsedNs => brokerMetrics.updateBatchProcessingTime(elapsedNs),
+batchSize => brokerMetrics.updateBatchSize(batchSize),
+brokerMetrics.lastAppliedImageProvenance)
+} else {
+  new 
MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
+_ => {},
+_ => {},
+new AtomicReference[MetadataProvenance](MetadataProvenance.EMPTY))

Review Comment:
   There are a few broker-specific metrics hanging out in 
`BrokerServerMetrics.scala` and this is a way to connect 
`MetadataLoaderMetrics` to that class. So that the metadata loader only needs 
to interact with `MetadataLoaderMetrics` and not the broker-specific code.
   
   Long-term, we probably want to move all the loader metrics into 
`MetadataLoaderMetrics`, and make them all accessible on the controller as well 
as broker. But that's out of scope for this change (and would need a KIP anyway)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267248743


##
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##
@@ -782,7 +735,7 @@ public void testTimeouts() throws Throwable {
 build()
 ) {
 QuorumController controller = controlEnv.activeController();
-CountDownLatch countDownLatch = controller.pause();
+CountDownLatch countDownLatch = pause(controller);

Review Comment:
   yeah, we can remove this and just use the test function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267249264


##
metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java:
##
@@ -140,39 +142,6 @@ public class QuorumControllerTest {
 static final BootstrapMetadata SIMPLE_BOOTSTRAP = BootstrapMetadata.
 fromVersion(MetadataVersion.IBP_3_3_IV3, "test-provided 
bootstrap");
 
-static class MockControllerMetrics extends QuorumControllerMetrics {
-final AtomicBoolean closed = new AtomicBoolean(false);
-
-MockControllerMetrics() {
-super(Optional.empty(), Time.SYSTEM, false);
-}
-
-@Override
-public void close() {
-super.close();
-closed.set(true);
-}
-}
-
-/**
- * Test creating a new QuorumController and closing it.
- */
-@Test
-public void testCreateAndClose() throws Throwable {
-MockControllerMetrics metrics = new MockControllerMetrics();
-try (
-LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
-build();
-QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
-setControllerBuilderInitializer(controllerBuilder -> {
-controllerBuilder.setMetrics(metrics);
-}).
-build()
-) {
-}
-assertTrue(metrics.closed.get(), "metrics were not closed");
-}

Review Comment:
   This was moved to 
`QuorumControllerMetricsIntegrationTest.testClosingQuorumControllerClosesMetrics`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #14010: KAFKA-15183: Add more controller, loader, snapshot emitter metrics

2023-07-18 Thread via GitHub


cmccabe commented on code in PR #14010:
URL: https://github.com/apache/kafka/pull/14010#discussion_r1267246622


##
metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java:
##
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.kafka.controller.metrics.ControllerMetricsTestUtils;
+import org.apache.kafka.image.MetadataProvenance;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+public class MetadataLoaderMetricsTest {
+private static class FakeMetadataLoaderMetrics implements AutoCloseable {
+final AtomicLong batchProcessingTimeNs = new AtomicLong(0L);
+final AtomicInteger batchSize = new AtomicInteger(0);
+final AtomicReference provenance =
+new AtomicReference<>(MetadataProvenance.EMPTY);
+final MetadataLoaderMetrics metrics;
+
+FakeMetadataLoaderMetrics() {
+this(Optional.empty());
+}
+
+FakeMetadataLoaderMetrics(MetricsRegistry registry) {
+this(Optional.of(registry));
+}
+
+FakeMetadataLoaderMetrics(Optional registry) {
+metrics = new MetadataLoaderMetrics(
+registry,
+n -> batchProcessingTimeNs.set(n),
+n -> batchSize.set(n),
+provenance);
+}
+
+@Override
+public void close() {
+metrics.close();
+}
+}
+
+@Test
+public void testMetricNames() {
+MetricsRegistry registry = new MetricsRegistry();
+try {
+try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.server",
+new HashSet<>(Arrays.asList(
+
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
+
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount"
+)));
+}
+ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.server",
+Collections.emptySet());
+} finally {
+registry.shutdown();
+}
+}
+
+@Test
+public void testUpdateBatchProcessingTimeNs() {
+MetricsRegistry registry = new MetricsRegistry();
+try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+fakeMetrics.metrics.updateBatchProcessingTimeNs(123L);
+assertEquals(123L, fakeMetrics.batchProcessingTimeNs.get());
+}
+}
+
+@Test
+public void testUpdateBatchSize() {
+MetricsRegistry registry = new MetricsRegistry();
+try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+fakeMetrics.metrics.updateBatchSize(50);
+assertEquals(50, fakeMetrics.batchSize.get());
+}
+}
+
+@Test
+public void testUpdateLastAppliedImageProvenance() {
+MetricsRegistry registry = new MetricsRegistry();
+try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+fakeMetrics.metrics.updateLastAppliedImageProvenance(new 
MetadataProvenance(1L, 2, 3L));
+assertEquals(new MetadataProvenance(1L, 2, 3L), 
fakeMetrics.provenance.get());

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: