This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch KAFKA-15183 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 2816888c74bc4f275f2abe94feaba7ad78b22599 Author: Colin P. McCabe <[email protected]> AuthorDate: Wed Jul 12 11:26:03 2023 -0700 KAFKA-15183: Add more controller, loader, snapshot emitter metrics Implement some of the metrics from KIP-938: Add more metrics for measuring KRaft performance. Add these metrics to QuorumControllerMetrics: kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount kafka.controller:type=KafkaController,name=NewActiveControllersCount Create LoaderMetrics with these new metrics: kafka.server:type=MetadataLoader,name=CurrentMetadataVersion kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount Create SnapshotEmitterMetrics with these new metrics: kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs --- checkstyle/import-control-metadata.xml | 16 +- .../src/main/scala/kafka/server/SharedServer.scala | 30 ++- .../server/metadata/BrokerServerMetrics.scala | 11 +- .../apache/kafka/controller/QuorumController.java | 54 ++++-- .../metrics/QuorumControllerMetrics.java | 83 +++++++- .../apache/kafka/image/loader/MetadataLoader.java | 66 ++++--- .../kafka/image/loader/MetadataLoaderMetrics.java | 46 ----- .../loader/metrics/MetadataLoaderMetrics.java | 137 +++++++++++++ .../publisher/metrics/SnapshotEmitterMetrics.java | 108 +++++++++++ .../QuorumControllerIntegrationTestUtils.java | 213 +++++++++++++++++++++ .../QuorumControllerMetricsIntegrationTest.java | 202 +++++++++++++++++++ .../kafka/controller/QuorumControllerTest.java | 122 +----------- .../metrics/QuorumControllerMetricsTest.java | 9 +- .../kafka/image/loader/MetadataLoaderTest.java | 24 +++ .../loader/metrics/MetadataLoaderMetricsTest.java | 148 ++++++++++++++ 15 files changed, 1042 insertions(+), 227 deletions(-) diff --git a/checkstyle/import-control-metadata.xml b/checkstyle/import-control-metadata.xml index 2cbb5504293..464006f0e38 100644 --- a/checkstyle/import-control-metadata.xml +++ b/checkstyle/import-control-metadata.xml @@ -63,7 +63,6 @@ </subpackage> <subpackage name="controller"> - <allow pkg="com.yammer.metrics"/> <allow pkg="org.apache.kafka.clients" /> <allow pkg="org.apache.kafka.clients.admin" /> <allow pkg="org.apache.kafka.common.acl" /> @@ -73,7 +72,6 @@ <allow pkg="org.apache.kafka.common.internals" /> <allow pkg="org.apache.kafka.common.message" /> <allow pkg="org.apache.kafka.common.metadata" /> - <allow pkg="org.apache.kafka.common.metrics" /> <allow pkg="org.apache.kafka.common.network" /> <allow pkg="org.apache.kafka.common.protocol" /> <allow pkg="org.apache.kafka.common.quota" /> @@ -93,13 +91,17 @@ <allow pkg="org.apache.kafka.server.common" /> <allow pkg="org.apache.kafka.server.config" /> <allow pkg="org.apache.kafka.server.fault" /> - <allow pkg="org.apache.kafka.server.metrics" /> <allow pkg="org.apache.kafka.server.mutable" /> <allow pkg="org.apache.kafka.server.policy"/> <allow pkg="org.apache.kafka.server.util"/> <allow pkg="org.apache.kafka.snapshot" /> <allow pkg="org.apache.kafka.test" /> <allow pkg="org.apache.kafka.timeline" /> + <subpackage name="metrics"> + <allow pkg="com.yammer.metrics"/> + <allow pkg="org.apache.kafka.common.metrics" /> + <allow pkg="org.apache.kafka.server.metrics" /> + </subpackage> </subpackage> <subpackage name="image"> @@ -122,6 +124,14 @@ <allow pkg="org.apache.kafka.server.util" /> <allow pkg="org.apache.kafka.snapshot" /> <allow pkg="org.apache.kafka.test" /> + <subpackage name="loader"> + <subpackage name="metrics"> + <allow pkg="com.yammer.metrics"/> + <allow pkg="org.apache.kafka.common.metrics" /> + <allow pkg="org.apache.kafka.controller.metrics" /> + <allow pkg="org.apache.kafka.server.metrics" /> + </subpackage> + </subpackage> </subpackage> <subpackage name="metadata"> diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index e58b33a8d57..892c528885a 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -25,7 +25,9 @@ import kafka.utils.{CoreUtils, Logging} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time} import org.apache.kafka.controller.metrics.ControllerMetadataMetrics +import org.apache.kafka.image.MetadataProvenance import org.apache.kafka.image.loader.MetadataLoader +import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator} import org.apache.kafka.metadata.MetadataRecordSerde import org.apache.kafka.raft.RaftConfig.AddressSpec @@ -34,7 +36,7 @@ import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, Process import org.apache.kafka.server.metrics.KafkaYammerMetrics import java.util -import java.util.{Collections, Optional} +import java.util.Optional import java.util.concurrent.{CompletableFuture, TimeUnit} import java.util.concurrent.atomic.AtomicReference @@ -106,6 +108,7 @@ class SharedServer( val snapshotsDisabledReason = new AtomicReference[String](null) @volatile var snapshotEmitter: SnapshotEmitter = _ @volatile var snapshotGenerator: SnapshotGenerator = _ + @volatile var metadataLoaderMetrics: MetadataLoaderMetrics = _ def isUsed(): Boolean = synchronized { usedByController || usedByBroker @@ -259,15 +262,24 @@ class SharedServer( raftManager = _raftManager _raftManager.startup() + metadataLoaderMetrics = if (brokerMetrics != null) { + new MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()), + elapsedNs => brokerMetrics.updateBatchProcessingTime(elapsedNs), + batchSize => brokerMetrics.updateBatchSize(batchSize), + brokerMetrics.lastAppliedImageProvenance) + } else { + new MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()), + _ => {}, + _ => {}, + new AtomicReference[MetadataProvenance](MetadataProvenance.EMPTY)) + } val loaderBuilder = new MetadataLoader.Builder(). setNodeId(metaProps.nodeId). setTime(time). setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-"). setFaultHandler(metadataLoaderFaultHandler). - setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()) - if (brokerMetrics != null) { - loaderBuilder.setMetadataLoaderMetrics(brokerMetrics) - } + setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()). + setMetrics(metadataLoaderMetrics) loader = loaderBuilder.build() snapshotEmitter = new SnapshotEmitter.Builder(). setNodeId(metaProps.nodeId). @@ -282,15 +294,15 @@ class SharedServer( setDisabledReason(snapshotsDisabledReason). setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-"). build() - _raftManager.register(loader) try { - loader.installPublishers(Collections.singletonList(snapshotGenerator)) + loader.installPublishers(util.Arrays.asList(snapshotGenerator)).get() } catch { case t: Throwable => { error("Unable to install metadata publishers", t) throw new RuntimeException("Unable to install metadata publishers.", t) } } + _raftManager.register(loader) debug("Completed SharedServer startup.") started = true } catch { @@ -326,6 +338,10 @@ class SharedServer( CoreUtils.swallow(loader.close(), this) loader = null } + if (metadataLoaderMetrics != null) { + CoreUtils.swallow(metadataLoaderMetrics.close(), this) + metadataLoaderMetrics = null + } if (snapshotGenerator != null) { CoreUtils.swallow(snapshotGenerator.close(), this) snapshotGenerator = null diff --git a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala index 212909101f2..ff183324166 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala @@ -23,7 +23,6 @@ import org.apache.kafka.common.metrics.Gauge import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.metrics.MetricConfig import org.apache.kafka.image.MetadataProvenance -import org.apache.kafka.image.loader.MetadataLoaderMetrics import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import java.util.Collections @@ -31,7 +30,7 @@ import java.util.concurrent.TimeUnit.NANOSECONDS final class BrokerServerMetrics private ( metrics: Metrics -) extends MetadataLoaderMetrics { +) extends AutoCloseable { import BrokerServerMetrics._ private val batchProcessingTimeHistName = KafkaMetricsGroup.explicitMetricName("kafka.server", @@ -123,15 +122,15 @@ final class BrokerServerMetrics private ( ).foreach(metrics.removeMetric) } - override def updateBatchProcessingTime(elapsedNs: Long): Unit = + def updateBatchProcessingTime(elapsedNs: Long): Unit = batchProcessingTimeHist.update(NANOSECONDS.toMicros(elapsedNs)) - override def updateBatchSize(size: Int): Unit = batchSizeHist.update(size) + def updateBatchSize(size: Int): Unit = batchSizeHist.update(size) - override def updateLastAppliedImageProvenance(provenance: MetadataProvenance): Unit = + def updateLastAppliedImageProvenance(provenance: MetadataProvenance): Unit = lastAppliedImageProvenance.set(provenance) - override def lastAppliedOffset(): Long = lastAppliedImageProvenance.get().lastContainedOffset() + def lastAppliedOffset(): Long = lastAppliedImageProvenance.get().lastContainedOffset() def lastAppliedTimestamp(): Long = lastAppliedImageProvenance.get().lastContainedLogTimeMs() } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 582d774e6e9..2f32c3638df 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -461,6 +461,12 @@ public final class QuorumController implements Controller { private Throwable handleEventException(String name, OptionalLong startProcessingTimeNs, Throwable exception) { + if (!startProcessingTimeNs.isPresent() && + ControllerExceptions.isTimeoutException(exception)) { + // If the event never started, and the exception is a timeout, increment the timed + // out metric. + controllerMetrics.incrementOperationsTimedOut(); + } Throwable externalException = ControllerExceptions.toExternalException(exception, () -> latestController()); if (!startProcessingTimeNs.isPresent()) { @@ -492,6 +498,15 @@ public final class QuorumController implements Controller { return externalException; } + private long updateEventStartMetricsAndGetTime(OptionalLong eventCreatedTimeNs) { + long now = time.nanoseconds(); + controllerMetrics.incrementOperationsStarted(); + if (eventCreatedTimeNs.isPresent()) { + controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs.getAsLong())); + } + return now; + } + /** * A controller event for handling internal state changes, such as Raft inputs. */ @@ -508,9 +523,8 @@ public final class QuorumController implements Controller { @Override public void run() throws Exception { - long now = time.nanoseconds(); - controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs)); - startProcessingTimeNs = OptionalLong.of(now); + startProcessingTimeNs = OptionalLong.of( + updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs))); log.debug("Executing {}.", this); handler.run(); handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong()); @@ -527,11 +541,16 @@ public final class QuorumController implements Controller { } } - private void appendControlEvent(String name, Runnable handler) { + void appendControlEvent(String name, Runnable handler) { ControllerEvent event = new ControllerEvent(name, handler); queue.append(event); } + void appendControlEventWithDeadline(String name, Runnable handler, long deadlineNs) { + ControllerEvent event = new ControllerEvent(name, handler); + queue.appendWithDeadline(deadlineNs, event); + } + /** * A controller event that reads the committed internal state in order to expose it * to an API. @@ -555,9 +574,8 @@ public final class QuorumController implements Controller { @Override public void run() throws Exception { - long now = time.nanoseconds(); - controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs)); - startProcessingTimeNs = OptionalLong.of(now); + startProcessingTimeNs = OptionalLong.of( + updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs))); T value = handler.get(); handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong()); future.complete(value); @@ -692,12 +710,11 @@ public final class QuorumController implements Controller { @Override public void run() throws Exception { - long now = time.nanoseconds(); - if (!flags.contains(DOES_NOT_UPDATE_QUEUE_TIME)) { - // We exclude deferred events from the event queue time metric to prevent - // incorrectly including the deferral time in the queue time. - controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs)); - } + // Deferred events set the DOES_NOT_UPDATE_QUEUE_TIME flag to prevent incorrectly + // including their deferral time in the event queue time. + startProcessingTimeNs = OptionalLong.of( + updateEventStartMetricsAndGetTime(flags.contains(DOES_NOT_UPDATE_QUEUE_TIME) ? + OptionalLong.empty() : OptionalLong.of(eventCreatedTimeNs))); int controllerEpoch = curClaimEpoch; if (!isActiveController(controllerEpoch)) { throw ControllerExceptions.newWrongControllerException(latestController()); @@ -706,7 +723,6 @@ public final class QuorumController implements Controller { log.info("Cannot run write operation {} in pre-migration mode. Returning NOT_CONTROLLER.", name); throw ControllerExceptions.newPreMigrationException(latestController()); } - startProcessingTimeNs = OptionalLong.of(now); ControllerResult<T> result = op.generateRecordsAndResult(); if (result.records().isEmpty()) { op.processBatchEndOffset(writeOffset); @@ -1063,6 +1079,9 @@ public final class QuorumController implements Controller { appendRaftEvent("handleLeaderChange[" + newLeader.epoch() + "]", () -> { final String newLeaderName = newLeader.leaderId().isPresent() ? String.valueOf(newLeader.leaderId().getAsInt()) : "(none)"; + if (newLeader.leaderId().isPresent()) { + controllerMetrics.incrementNewActiveControllers(); + } if (isActiveController()) { if (newLeader.isLeader(nodeId)) { log.warn("We were the leader in epoch {}, and are still the leader " + @@ -1308,7 +1327,7 @@ public final class QuorumController implements Controller { } } - private void renounce() { + void renounce() { try { if (curClaimEpoch == -1) { throw new RuntimeException("Cannot renounce leadership because we are not the " + @@ -2307,4 +2326,9 @@ public final class QuorumController implements Controller { Time time() { return time; } + + // VisibleForTesting + QuorumControllerMetrics controllerMetrics() { + return controllerMetrics; + } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java index 566245ed096..a95fa11d1c1 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java +++ b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java @@ -52,6 +52,14 @@ public class QuorumControllerMetrics implements AutoCloseable { "KafkaController", "LastAppliedRecordTimestamp"); private final static MetricName LAST_APPLIED_RECORD_LAG_MS = getMetricName( "KafkaController", "LastAppliedRecordLagMs"); + private final static MetricName TIMED_OUT_BROKER_HEARTBEAT_COUNT = getMetricName( + "KafkaController", "TimedOutBrokerHeartbeatCount"); + private final static MetricName EVENT_QUEUE_OPERATIONS_STARTED_COUNT = getMetricName( + "KafkaController", "EventQueueOperationsStartedCount"); + private final static MetricName EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT = getMetricName( + "KafkaController", "EventQueueOperationsTimedOutCount"); + private final static MetricName NEW_ACTIVE_CONTROLLERS_COUNT = getMetricName( + "KafkaController", "NewActiveControllersCount"); private final Optional<MetricsRegistry> registry; private volatile boolean active; @@ -61,6 +69,9 @@ public class QuorumControllerMetrics implements AutoCloseable { private final Consumer<Long> eventQueueTimeUpdater; private final Consumer<Long> eventQueueProcessingTimeUpdater; private final AtomicLong timedOutHeartbeats = new AtomicLong(0); + private final AtomicLong operationsStarted = new AtomicLong(0); + private final AtomicLong operationsTimedOut = new AtomicLong(0); + private final AtomicLong newActiveControllers = new AtomicLong(0); private Consumer<Long> newHistogram(MetricName name, boolean biased) { if (registry.isPresent()) { @@ -109,6 +120,30 @@ public class QuorumControllerMetrics implements AutoCloseable { return time.milliseconds() - lastAppliedRecordTimestamp(); } })); + registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, new Gauge<Long>() { + @Override + public Long value() { + return timedOutHeartbeats(); + } + })); + registry.ifPresent(r -> r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge<Long>() { + @Override + public Long value() { + return timedOutHeartbeats(); + } + })); + registry.ifPresent(r -> r.newGauge(EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT, new Gauge<Long>() { + @Override + public Long value() { + return timedOutHeartbeats(); + } + })); + registry.ifPresent(r -> r.newGauge(NEW_ACTIVE_CONTROLLERS_COUNT, new Gauge<Long>() { + @Override + public Long value() { + return newActiveControllers(); + } + })); } public void setActive(boolean active) { @@ -152,17 +187,53 @@ public class QuorumControllerMetrics implements AutoCloseable { } public void incrementTimedOutHeartbeats() { - timedOutHeartbeats.addAndGet(1); + timedOutHeartbeats.incrementAndGet(); } - public void setTimedOutHeartbeats(long heartbeats) { - timedOutHeartbeats.set(heartbeats); + public void setTimedOutHeartbeats(long value) { + timedOutHeartbeats.set(value); } public long timedOutHeartbeats() { return timedOutHeartbeats.get(); } + public void incrementOperationsStarted() { + operationsStarted.incrementAndGet(); + } + + public void setOperationsStarted(long value) { + operationsStarted.set(value); + } + + public long operationsStarted() { + return operationsStarted.get(); + } + + public void incrementOperationsTimedOut() { + operationsTimedOut.incrementAndGet(); + } + + public void setOperationsTimedOut(long value) { + operationsTimedOut.set(value); + } + + public long operationsTimedOut() { + return operationsTimedOut.get(); + } + + public void incrementNewActiveControllers() { + newActiveControllers.incrementAndGet(); + } + + public void setNewActiveControllers(long value) { + newActiveControllers.set(value); + } + + public long newActiveControllers() { + return newActiveControllers.get(); + } + @Override public void close() { registry.ifPresent(r -> Arrays.asList( @@ -172,7 +243,11 @@ public class QuorumControllerMetrics implements AutoCloseable { LAST_APPLIED_RECORD_OFFSET, LAST_COMMITTED_RECORD_OFFSET, LAST_APPLIED_RECORD_TIMESTAMP, - LAST_APPLIED_RECORD_LAG_MS + LAST_APPLIED_RECORD_LAG_MS, + TIMED_OUT_BROKER_HEARTBEAT_COUNT, + EVENT_QUEUE_OPERATIONS_STARTED_COUNT, + EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT, + NEW_ACTIVE_CONTROLLERS_COUNT ).forEach(r::removeMetric)); } diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java index 768fcb2574b..c2c066418b9 100644 --- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java +++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics; import org.apache.kafka.image.publisher.MetadataPublisher; import org.apache.kafka.image.writer.ImageReWriter; import org.apache.kafka.image.writer.ImageWriterOptions; @@ -35,14 +36,17 @@ import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.fault.FaultHandler; import org.apache.kafka.server.fault.FaultHandlerException; import org.apache.kafka.snapshot.SnapshotReader; +import org.apache.kafka.snapshot.Snapshots; import org.slf4j.Logger; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; +import java.util.Optional; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -69,28 +73,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> private Time time = Time.SYSTEM; private LogContext logContext = null; private FaultHandler faultHandler = (m, e) -> new FaultHandlerException(m, e); - private MetadataLoaderMetrics metrics = new MetadataLoaderMetrics() { - private volatile long lastAppliedOffset = -1L; - - @Override - public void updateBatchProcessingTime(long elapsedNs) { } - - @Override - public void updateBatchSize(int size) { } - - @Override - public void updateLastAppliedImageProvenance(MetadataProvenance provenance) { - this.lastAppliedOffset = provenance.lastContainedOffset(); - } - - @Override - public long lastAppliedOffset() { - return lastAppliedOffset; - } - - @Override - public void close() throws Exception { } - }; + private MetadataLoaderMetrics metrics = null; private Supplier<OptionalLong> highWaterMarkAccessor = null; public Builder setNodeId(int nodeId) { @@ -113,13 +96,13 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> return this; } - public Builder setMetadataLoaderMetrics(MetadataLoaderMetrics metrics) { - this.metrics = metrics; + public Builder setHighWaterMarkAccessor(Supplier<OptionalLong> highWaterMarkAccessor) { + this.highWaterMarkAccessor = highWaterMarkAccessor; return this; } - public Builder setHighWaterMarkAccessor(Supplier<OptionalLong> highWaterMarkAccessor) { - this.highWaterMarkAccessor = highWaterMarkAccessor; + public Builder setMetrics(MetadataLoaderMetrics metrics) { + this.metrics = metrics; return this; } @@ -130,6 +113,12 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> if (highWaterMarkAccessor == null) { throw new RuntimeException("You must set the high water mark accessor."); } + if (metrics == null) { + metrics = new MetadataLoaderMetrics(Optional.empty(), + __ -> { }, + __ -> { }, + new AtomicReference<>(MetadataProvenance.EMPTY)); + } return new MetadataLoader( time, logContext, @@ -221,6 +210,11 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> new ShutdownEvent()); } + // VisibleForTesting + MetadataLoaderMetrics metrics() { + return metrics; + } + private boolean stillNeedToCatchUp(String where, long offset) { if (!catchingUp) { log.trace("{}: we are not in the initial catching up state.", where); @@ -349,6 +343,9 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> } } metrics.updateLastAppliedImageProvenance(image.provenance()); + if (delta.featuresDelta() != null) { + metrics.setCurrentMetadataVersion(image.features().metadataVersion()); + } if (uninitializedPublishers.isEmpty()) { scheduleInitializeNewPublishers(0); } @@ -406,7 +403,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> MetadataProvenance provenance = new MetadataProvenance(lastOffset, lastEpoch, lastContainedLogTimeMs); long elapsedNs = time.nanoseconds() - startNs; - metrics.updateBatchProcessingTime(elapsedNs); + metrics.updateBatchProcessingTimeNs(elapsedNs); return new LogDeltaManifest(provenance, currentLeaderAndEpoch, numBatches, @@ -418,24 +415,30 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) { eventQueue.append(() -> { try { + long numLoaded = metrics.incrementHandleLoadSnapshotCount(); + String snapshotName = Snapshots.filenameFromSnapshotId(reader.snapshotId()); + log.info("handleLoadSnapshot({}): incrementing HandleLoadSnapshotCount to {}.", + snapshotName, numLoaded); MetadataDelta delta = new MetadataDelta.Builder(). setImage(image). build(); SnapshotManifest manifest = loadSnapshot(delta, reader); - log.info("handleLoadSnapshot: generated a metadata delta from a snapshot at offset {} " + - "in {} us.", manifest.provenance().lastContainedOffset(), + log.info("handleLoadSnapshot({}): generated a metadata delta between offset {} " + + "and this snapshot in {} us.", snapshotName, + image.provenance().lastContainedOffset(), NANOSECONDS.toMicros(manifest.elapsedNs())); try { image = delta.apply(manifest.provenance()); } catch (Throwable e) { faultHandler.handleFault("Error generating new metadata image from " + - "snapshot at offset " + reader.lastContainedLogOffset(), e); + "snapshot " + snapshotName, e); return; } if (stillNeedToCatchUp("handleLoadSnapshot", manifest.provenance().lastContainedOffset())) { return; } - log.info("handleLoadSnapshot: publishing new snapshot image with provenance {}.", image.provenance()); + log.info("handleLoadSnapshot({}): publishing new snapshot image to {} publisher(s).", + snapshotName, publishers.size()); for (MetadataPublisher publisher : publishers.values()) { try { publisher.onMetadataUpdate(delta, image, manifest); @@ -446,6 +449,7 @@ public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion> } } metrics.updateLastAppliedImageProvenance(image.provenance()); + metrics.setCurrentMetadataVersion(image.features().metadataVersion()); if (uninitializedPublishers.isEmpty()) { scheduleInitializeNewPublishers(0); } diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java deleted file mode 100644 index 654bc9dd505..00000000000 --- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.image.loader; - -import org.apache.kafka.image.MetadataProvenance; - - -/** - * An interface for the metadata loader metrics. - */ -public interface MetadataLoaderMetrics extends AutoCloseable { - /** - * Update the batch processing time histogram. - */ - void updateBatchProcessingTime(long elapsedNs); - - /** - * Update the batch size histogram. - */ - void updateBatchSize(int size); - - /** - * Set the provenance of the last image which has been processed by all publishers. - */ - void updateLastAppliedImageProvenance(MetadataProvenance provenance); - - /** - * Retrieve the last offset which has been processed by all publishers. - */ - long lastAppliedOffset(); -} diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java b/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java new file mode 100644 index 00000000000..351ac4fc5c3 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.loader.metrics; + +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.MetricsRegistry; +import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; + +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +/** + * These are the metrics which are managed by the MetadataLoader class. + */ +public final class MetadataLoaderMetrics implements AutoCloseable { + private final static MetricName CURRENT_METADATA_VERSION = getMetricName( + "MetadataLoader", "CurrentMetadataVersion"); + private final static MetricName HANDLE_LOAD_SNAPSHOT_COUNT = getMetricName( + "MetadataLoader", "HandleLoadSnapshotCount"); + + private final Optional<MetricsRegistry> registry; + private final AtomicReference<MetadataVersion> currentMetadataVersion = + new AtomicReference<>(MetadataVersion.MINIMUM_KRAFT_VERSION); + private final AtomicLong handleLoadSnapshotCount = new AtomicLong(0); + private final Consumer<Long> batchProcessingTimeNsUpdater; + private final Consumer<Integer> batchSizesUpdater; + private final AtomicReference<MetadataProvenance> lastAppliedProvenance; + + /** + * Create a new LoaderMetrics object. + * + * @param registry The metrics registry, or Optional.empty if this is a + * test and we don't have one. + * @param batchProcessingTimeNsUpdater Updates the batch processing time histogram. + * @param batchSizesUpdater Updates the batch sizes histogram. + */ + public MetadataLoaderMetrics( + Optional<MetricsRegistry> registry, + Consumer<Long> batchProcessingTimeNsUpdater, + Consumer<Integer> batchSizesUpdater, + AtomicReference<MetadataProvenance> lastAppliedProvenance + ) { + this.registry = registry; + this.batchProcessingTimeNsUpdater = batchProcessingTimeNsUpdater; + this.batchSizesUpdater = batchSizesUpdater; + this.lastAppliedProvenance = lastAppliedProvenance; + registry.ifPresent(r -> r.newGauge(CURRENT_METADATA_VERSION, new Gauge<Integer>() { + @Override + public Integer value() { + return Integer.valueOf(currentMetadataVersion().featureLevel()); + } + })); + registry.ifPresent(r -> r.newGauge(HANDLE_LOAD_SNAPSHOT_COUNT, new Gauge<Long>() { + @Override + public Long value() { + return handleLoadSnapshotCount(); + } + })); + } + + /** + * Update the batch processing time histogram. + */ + public void updateBatchProcessingTimeNs(long elapsedNs) { + batchProcessingTimeNsUpdater.accept(elapsedNs); + } + + /** + * Update the batch size histogram. + */ + public void updateBatchSize(int size) { + batchSizesUpdater.accept(size); + } + + /** + * Set the provenance of the last image which has been processed by all publishers. + */ + public void updateLastAppliedImageProvenance(MetadataProvenance lastAppliedProvenance) { + this.lastAppliedProvenance.set(lastAppliedProvenance); + } + + /** + * Retrieve the last offset which has been processed by all publishers. + */ + public long lastAppliedOffset() { + return this.lastAppliedProvenance.get().lastContainedOffset(); + } + + public void setCurrentMetadataVersion(MetadataVersion metadataVersion) { + this.currentMetadataVersion.set(metadataVersion); + } + + public MetadataVersion currentMetadataVersion() { + return this.currentMetadataVersion.get(); + } + + public long incrementHandleLoadSnapshotCount() { + return this.handleLoadSnapshotCount.incrementAndGet(); + } + + public long handleLoadSnapshotCount() { + return this.handleLoadSnapshotCount.get(); + } + + @Override + public void close() { + registry.ifPresent(r -> Arrays.asList( + CURRENT_METADATA_VERSION, + HANDLE_LOAD_SNAPSHOT_COUNT + ).forEach(r::removeMetric)); + } + + private static MetricName getMetricName(String type, String name) { + return KafkaYammerMetrics.getMetricName("kafka.server", type, name); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java b/metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java new file mode 100644 index 00000000000..0a2cd308463 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.loader.metrics; + +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.MetricsRegistry; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; + +import java.util.Arrays; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * These are the metrics which are managed by the SnapshotEmitter class. + */ +public final class SnapshotEmitterMetrics implements AutoCloseable { + private final static MetricName LATEST_SNAPSHOT_GENERATED_BYTES = getMetricName( + "SnapshotEmitter", "LatestSnapshotGeneratedBytes"); + private final static MetricName LATEST_SNAPSHOT_GENERATED_AGE_MS = getMetricName( + "SnapshotEmitter", "LatestSnapshotGeneratedAgeMs"); + + private final Optional<MetricsRegistry> registry; + private final Time time; + private final AtomicLong latestSnapshotGeneratedBytes; + private final AtomicLong latestSnapshotGeneratedTimeMs; + + /** + * Create a new LoaderMetrics object. + * + * @param registry The metrics registry, or Optional.empty if this is a test and we don't have one. + */ + public SnapshotEmitterMetrics( + Optional<MetricsRegistry> registry, + Time time, + long initialLatestSnapshotGeneratedBytes + ) { + this.registry = registry; + this.time = time; + this.latestSnapshotGeneratedBytes = new AtomicLong(initialLatestSnapshotGeneratedBytes); + this.latestSnapshotGeneratedTimeMs = new AtomicLong(monoTimeInMs()); + registry.ifPresent(r -> r.newGauge(LATEST_SNAPSHOT_GENERATED_BYTES, new Gauge<Long>() { + @Override + public Long value() { + return latestSnapshotGeneratedBytes(); + } + })); + registry.ifPresent(r -> r.newGauge(LATEST_SNAPSHOT_GENERATED_AGE_MS, new Gauge<Long>() { + @Override + public Long value() { + return latestSnapshotGeneratedAgeMs(); + } + })); + } + + long monoTimeInMs() { + return TimeUnit.NANOSECONDS.toMillis(time.nanoseconds()); + } + + public void setLatestSnapshotGeneratedBytes(long value) { + this.latestSnapshotGeneratedBytes.set(value); + } + + public long latestSnapshotGeneratedBytes() { + return this.latestSnapshotGeneratedBytes.get(); + } + + public void setLatestSnapshotGeneratedTimeMs(long timeMs) { + this.latestSnapshotGeneratedTimeMs.set(timeMs); + } + + public long latestSnapshotGeneratedTimeMs() { + return this.latestSnapshotGeneratedTimeMs.get(); + } + + public long latestSnapshotGeneratedAgeMs() { + return time.milliseconds() - this.latestSnapshotGeneratedTimeMs.get(); + } + + @Override + public void close() { + registry.ifPresent(r -> Arrays.asList( + LATEST_SNAPSHOT_GENERATED_BYTES, + LATEST_SNAPSHOT_GENERATED_AGE_MS + ).forEach(r::removeMetric)); + } + + private static MetricName getMetricName(String type, String name) { + return KafkaYammerMetrics.getMetricName("kafka.server", type, name); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java new file mode 100644 index 00000000000..83758c0fff1 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.BrokerRegistrationRequestData.Listener; +import org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection; +import org.apache.kafka.common.message.BrokerRegistrationRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData; +import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; +import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metadata.BrokerRegistrationReply; +import org.apache.kafka.server.common.MetadataVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT; +import static org.junit.jupiter.api.Assertions.assertEquals; + + +/** + * Utility functions for use in QuorumController integration tests. + */ +class QuorumControllerIntegrationTestUtils { + private final static Logger log = LoggerFactory.getLogger(QuorumControllerIntegrationTestUtils.class); + + BrokerRegistrationRequestData.FeatureCollection brokerFeatures() { + return brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, MetadataVersion.latest()); + } + + /** + * Create a broker features collection for use in a registration request. We only set MV. here. + * + * @param minVersion The minimum supported MV. + * @param maxVersion The maximum supported MV. + */ + static BrokerRegistrationRequestData.FeatureCollection brokerFeatures( + MetadataVersion minVersion, + MetadataVersion maxVersion + ) { + BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection(); + features.add(new BrokerRegistrationRequestData.Feature() + .setName(MetadataVersion.FEATURE_NAME) + .setMinSupportedVersion(minVersion.featureLevel()) + .setMaxSupportedVersion(maxVersion.featureLevel())); + return features; + } + + /** + * Register the given number of brokers. + * + * @param controller The active controller. + * @param numBrokers The number of brokers to register. We will start at 0 and increment. + * + * @return A map from broker IDs to broker epochs. + */ + static Map<Integer, Long> registerBrokersAndUnfence( + QuorumController controller, + int numBrokers + ) throws Exception { + Map<Integer, Long> brokerEpochs = new HashMap<>(); + for (int brokerId = 0; brokerId < numBrokers; brokerId++) { + BrokerRegistrationReply reply = controller.registerBroker(ANONYMOUS_CONTEXT, + new BrokerRegistrationRequestData() + .setBrokerId(brokerId) + .setRack(null) + .setClusterId(controller.clusterId()) + .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0)) + .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId)) + .setListeners(new ListenerCollection( + Arrays.asList( + new Listener() + .setName("PLAINTEXT") + .setHost("localhost") + .setPort(9092 + brokerId) + ).iterator() + ) + ) + ).get(); + brokerEpochs.put(brokerId, reply.epoch()); + + // Send heartbeat to unfence + controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT, + new BrokerHeartbeatRequestData() + .setWantFence(false) + .setBrokerEpoch(brokerEpochs.get(brokerId)) + .setBrokerId(brokerId) + .setCurrentMetadataOffset(100000L) + ).get(); + } + + return brokerEpochs; + } + + /** + * Send broker heartbeats for the provided brokers. + * + * @param controller The active controller. + * @param brokers The broker IDs to send heartbeats for. + * @param brokerEpochs A map from broker ID to broker epoch. + */ + static void sendBrokerHeartbeat( + QuorumController controller, + List<Integer> brokers, + Map<Integer, Long> brokerEpochs + ) throws Exception { + if (brokers.isEmpty()) { + return; + } + for (Integer brokerId : brokers) { + BrokerHeartbeatReply reply = controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT, + new BrokerHeartbeatRequestData() + .setWantFence(false) + .setBrokerEpoch(brokerEpochs.get(brokerId)) + .setBrokerId(brokerId) + .setCurrentMetadataOffset(100000) + ).get(); + assertEquals(new BrokerHeartbeatReply(true, false, false, false), reply); + } + } + + /** + * Create some topics directly on the controller. + * + * @param controller The active controller. + * @param prefix The prefix to use for topic names. + * @param numTopics The number of topics to create. + * @param replicationFactor The replication factor to use. + */ + static void createTopics( + QuorumController controller, + String prefix, + int numTopics, + int replicationFactor + ) throws Exception { + HashSet<String> describable = new HashSet<>(); + for (int i = 0; i < numTopics; i++) { + describable.add(prefix + i); + } + CreateTopicsRequestData request = new CreateTopicsRequestData(); + for (int i = 0; i < numTopics; i++) { + request.topics().add( + new CreatableTopic(). + setName(prefix + i). + setNumPartitions(-1). + setReplicationFactor((short) replicationFactor)); + } + CreateTopicsResponseData response = + controller.createTopics(ANONYMOUS_CONTEXT, request, describable).get(); + for (int i = 0; i < numTopics; i++) { + CreatableTopicResult result = response.topics().find(prefix + i); + assertEquals((short) 0, result.errorCode()); + } + } + + /** + * Add an event to the controller event queue that will pause it temporarily. + * + * @param controller The controller. + * @return The latch that can be used to unpause the controller. + */ + static CountDownLatch pause(QuorumController controller) { + final CountDownLatch latch = new CountDownLatch(1); + controller.appendControlEvent("pause", () -> { + try { + latch.await(); + } catch (InterruptedException e) { + log.info("Interrupted while waiting for unpause.", e); + } + }); + return latch; + } + + /** + * Force the current controller to renounce. + * + * @param controller The controller. + */ + static void forceRenounce(QuorumController controller) throws Exception { + CompletableFuture<Void> future = new CompletableFuture<>(); + controller.appendControlEvent("forceRenounce", () -> { + controller.renounce(); + future.complete(null); + }); + future.get(); + } +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java new file mode 100644 index 00000000000..5cd6fe4b1b9 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.message.BrokerHeartbeatRequestData; +import org.apache.kafka.common.message.RequestHeaderData; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.controller.metrics.QuorumControllerMetrics; +import org.apache.kafka.metadata.BrokerHeartbeatReply; +import org.apache.kafka.metalog.LocalLogManagerTestEnv; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.forceRenounce; +import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause; +import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@Timeout(value = 40) +public class QuorumControllerMetricsIntegrationTest { + private final static Logger log = LoggerFactory.getLogger(QuorumControllerMetricsIntegrationTest.class); + + static class MockControllerMetrics extends QuorumControllerMetrics { + final AtomicBoolean closed = new AtomicBoolean(false); + + MockControllerMetrics() { + super(Optional.empty(), Time.SYSTEM); + } + + @Override + public void close() { + super.close(); + closed.set(true); + } + } + + /** + * Test that closing the QuorumController closes the metrics object. + */ + @Test + public void testClosingQuorumControllerClosesMetrics() throws Throwable { + MockControllerMetrics metrics = new MockControllerMetrics(); + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + setControllerBuilderInitializer(controllerBuilder -> { + controllerBuilder.setMetrics(metrics); + }). + build() + ) { + assertEquals(1, controlEnv.activeController().controllerMetrics().newActiveControllers()); + } + assertTrue(metrics.closed.get(), "metrics were not closed"); + } + + /** + * Test that failing over to a new controller increments NewActiveControllersCount on both the + * active and inactive controllers. + */ + @Test + public void testFailingOverIncrementsNewActiveControllerCount() throws Throwable { + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + build() + ) { + controlEnv.activeController(); // wait for a controller to become active. + TestUtils.retryOnExceptionWithTimeout(30_000, () -> { + for (QuorumController controller : controlEnv.controllers()) { + assertEquals(1, controller.controllerMetrics().newActiveControllers()); + } + }); + forceRenounce(controlEnv.activeController()); + TestUtils.retryOnExceptionWithTimeout(30_000, () -> { + for (QuorumController controller : controlEnv.controllers()) { + assertEquals(2, controller.controllerMetrics().newActiveControllers()); + } + }); + } + } + + /** + * Test the heartbeat and general operation timeout metrics. + * These are incremented on the active controller only. + */ + @Test + public void testTimeoutMetrics() throws Throwable { + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + build() + ) { + QuorumController active = controlEnv.activeController(); + Map<Integer, Long> brokerEpochs = registerBrokersAndUnfence(active, 3); + assertEquals(0L, active.controllerMetrics().timedOutHeartbeats()); + assertEquals(0L, active.controllerMetrics().operationsTimedOut()); + + // We pause the controller so that the heartbeat event will definitely be expired + // rather than processed. + CountDownLatch latch = pause(active); + ControllerRequestContext expiredTimeoutContext = new ControllerRequestContext( + new RequestHeaderData(), + KafkaPrincipal.ANONYMOUS, + OptionalLong.of(active.time().nanoseconds())); + CompletableFuture<BrokerHeartbeatReply> replyFuture = + active.processBrokerHeartbeat(expiredTimeoutContext, + new BrokerHeartbeatRequestData() + .setWantFence(false) + .setBrokerEpoch(brokerEpochs.get(0)) + .setBrokerId(0) + .setCurrentMetadataOffset(100000)); + latch.countDown(); // Unpause the controller. + assertEquals(TimeoutException.class, + assertThrows(ExecutionException.class, () -> replyFuture.get()). + getCause().getClass()); + assertEquals(1L, active.controllerMetrics().timedOutHeartbeats()); + assertEquals(1L, active.controllerMetrics().operationsTimedOut()); + + // Inject a new timed out operation. + CountDownLatch latch2 = pause(active); + active.appendControlEventWithDeadline("fakeTimeoutOperation", + () -> { }, + active.time().nanoseconds()); + latch2.countDown(); + TestUtils.retryOnExceptionWithTimeout(30_000, () -> { + // The fake timeout increments operationsTimedOut but not timedOutHeartbeats. + assertEquals(1L, active.controllerMetrics().timedOutHeartbeats()); + assertEquals(2L, active.controllerMetrics().operationsTimedOut()); + }); + for (QuorumController controller : controlEnv.controllers()) { + // Inactive controllers don't set these metrics. + if (!controller.isActive()) { + assertEquals(false, controller.controllerMetrics().active()); + assertEquals(0L, controller.controllerMetrics().timedOutHeartbeats()); + assertEquals(0L, controller.controllerMetrics().operationsTimedOut()); + } + } + } + } + + /** + * Test the event queue operations started metric. + */ + @Test + public void testEventQueueOperationsStartedMetric() throws Throwable { + try ( + LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3). + build(); + QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). + build() + ) { + QuorumController active = controlEnv.activeController(); + Map<Integer, Long> brokerEpochs = registerBrokersAndUnfence(active, 3); + + // Test that a new operation increments operationsStarted. We retry this if needed + // to handle the case where another operation is performed in between loading + // expectedOperationsStarted and running the new control event. + TestUtils.retryOnExceptionWithTimeout(30_000, () -> { + long expectedOperationsStarted = active.controllerMetrics().operationsStarted() + 1; + CompletableFuture<Long> actualOperationsStarted = new CompletableFuture<>(); + active.appendControlEvent("checkOperationsStarted", () -> { + actualOperationsStarted.complete(active.controllerMetrics().operationsStarted()); + }); + assertEquals(expectedOperationsStarted, actualOperationsStarted.get()); + }); + } + } +} diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index d55ff5f67c2..261d3c91b28 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -36,7 +36,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -88,7 +87,6 @@ import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.controller.QuorumController.ConfigResourceExistenceChecker; -import org.apache.kafka.controller.metrics.QuorumControllerMetrics; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistrationFencingChange; import org.apache.kafka.metadata.BrokerRegistrationReply; @@ -124,6 +122,10 @@ import static org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER import static org.apache.kafka.controller.ConfigurationControlManagerTest.SCHEMA; import static org.apache.kafka.controller.ConfigurationControlManagerTest.entry; import static org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT; +import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeatures; +import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause; +import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence; +import static org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.sendBrokerHeartbeat; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -140,39 +142,6 @@ public class QuorumControllerTest { static final BootstrapMetadata SIMPLE_BOOTSTRAP = BootstrapMetadata. fromVersion(MetadataVersion.IBP_3_3_IV3, "test-provided bootstrap"); - static class MockControllerMetrics extends QuorumControllerMetrics { - final AtomicBoolean closed = new AtomicBoolean(false); - - MockControllerMetrics() { - super(Optional.empty(), Time.SYSTEM); - } - - @Override - public void close() { - super.close(); - closed.set(true); - } - } - - /** - * Test creating a new QuorumController and closing it. - */ - @Test - public void testCreateAndClose() throws Throwable { - MockControllerMetrics metrics = new MockControllerMetrics(); - try ( - LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1). - build(); - QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv). - setControllerBuilderInitializer(controllerBuilder -> { - controllerBuilder.setMetrics(metrics); - }). - build() - ) { - } - assertTrue(metrics.closed.get(), "metrics were not closed"); - } - /** * Test setting some configuration values and reading them back. */ @@ -610,22 +579,6 @@ public class QuorumControllerTest { } } - private BrokerRegistrationRequestData.FeatureCollection brokerFeatures() { - return brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, MetadataVersion.latest()); - } - - private BrokerRegistrationRequestData.FeatureCollection brokerFeatures( - MetadataVersion minVersion, - MetadataVersion maxVersion - ) { - BrokerRegistrationRequestData.FeatureCollection features = new BrokerRegistrationRequestData.FeatureCollection(); - features.add(new BrokerRegistrationRequestData.Feature() - .setName(MetadataVersion.FEATURE_NAME) - .setMinSupportedVersion(minVersion.featureLevel()) - .setMaxSupportedVersion(maxVersion.featureLevel())); - return features; - } - private RegisterBrokerRecord.BrokerFeatureCollection registrationFeatures( MetadataVersion minVersion, MetadataVersion maxVersion @@ -782,7 +735,7 @@ public class QuorumControllerTest { build() ) { QuorumController controller = controlEnv.activeController(); - CountDownLatch countDownLatch = controller.pause(); + CountDownLatch countDownLatch = pause(controller); long now = controller.time().nanoseconds(); ControllerRequestContext context0 = new ControllerRequestContext( new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, OptionalLong.of(now)); @@ -846,7 +799,7 @@ public class QuorumControllerTest { build() ) { QuorumController controller = controlEnv.activeController(); - CountDownLatch countDownLatch = controller.pause(); + CountDownLatch countDownLatch = pause(controller); CompletableFuture<CreateTopicsResponseData> createFuture = controller.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData(). setTimeoutMs(120000), Collections.emptySet()); @@ -891,7 +844,7 @@ public class QuorumControllerTest { ) { QuorumController controller = controlEnv.activeController(); - Map<Integer, Long> brokerEpochs = registerBrokers(controller, numBrokers); + Map<Integer, Long> brokerEpochs = registerBrokersAndUnfence(controller, numBrokers); // Create a lot of partitions List<CreatableReplicaAssignment> partitions = IntStream @@ -980,62 +933,6 @@ public class QuorumControllerTest { } } - private Map<Integer, Long> registerBrokers(QuorumController controller, int numBrokers) throws Exception { - Map<Integer, Long> brokerEpochs = new HashMap<>(); - for (int brokerId = 0; brokerId < numBrokers; brokerId++) { - BrokerRegistrationReply reply = controller.registerBroker(ANONYMOUS_CONTEXT, - new BrokerRegistrationRequestData() - .setBrokerId(brokerId) - .setRack(null) - .setClusterId(controller.clusterId()) - .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_6_IV0)) - .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId)) - .setListeners( - new ListenerCollection( - Arrays.asList( - new Listener() - .setName("PLAINTEXT") - .setHost("localhost") - .setPort(9092 + brokerId) - ).iterator() - ) - ) - ).get(); - brokerEpochs.put(brokerId, reply.epoch()); - - // Send heartbeat to unfence - controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT, - new BrokerHeartbeatRequestData() - .setWantFence(false) - .setBrokerEpoch(brokerEpochs.get(brokerId)) - .setBrokerId(brokerId) - .setCurrentMetadataOffset(100000L) - ).get(); - } - - return brokerEpochs; - } - - private void sendBrokerHeartbeat( - QuorumController controller, - List<Integer> brokers, - Map<Integer, Long> brokerEpochs - ) throws Exception { - if (brokers.isEmpty()) { - return; - } - for (Integer brokerId : brokers) { - BrokerHeartbeatReply reply = controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT, - new BrokerHeartbeatRequestData() - .setWantFence(false) - .setBrokerEpoch(brokerEpochs.get(brokerId)) - .setBrokerId(brokerId) - .setCurrentMetadataOffset(100000) - ).get(); - assertEquals(new BrokerHeartbeatReply(true, false, false, false), reply); - } - } - @Test public void testConfigResourceExistenceChecker() throws Throwable { try ( @@ -1048,7 +945,7 @@ public class QuorumControllerTest { build() ) { QuorumController active = controlEnv.activeController(); - registerBrokers(active, 5); + registerBrokersAndUnfence(active, 5); active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData(). setTopics(new CreatableTopicCollection(Collections.singleton( new CreatableTopic().setName("foo"). @@ -1508,7 +1405,8 @@ public class QuorumControllerTest { featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, Optional.empty(), true); assertEquals(MetadataVersion.IBP_3_4_IV0, featureControl.metadataVersion()); - assertEquals(ZkMigrationState.PRE_MIGRATION, featureControl.zkMigrationState()); } + assertEquals(ZkMigrationState.PRE_MIGRATION, featureControl.zkMigrationState()); + } @Test public void testMigrationsEnabledForOldBootstrapMetadataVersion() throws Exception { diff --git a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java index 619100f1ed8..5bba7d0658b 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java @@ -40,14 +40,17 @@ public class QuorumControllerMetricsTest { try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(Optional.of(registry), time)) { ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller", new HashSet<>(Arrays.asList( - "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs", "kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs", + "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs", "kafka.controller:type=KafkaController,name=ActiveControllerCount", - "kafka.controller:type=KafkaController,name=LastCommittedRecordOffset", + "kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount", + "kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount", "kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs", "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset", "kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp", - "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset" + "kafka.controller:type=KafkaController,name=LastCommittedRecordOffset", + "kafka.controller:type=KafkaController,name=NewActiveControllersCount", + "kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount" ))); } ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.controller", diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java index 1eefa15a8c3..372700b1fb6 100644 --- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.fault.MockFaultHandler; import org.apache.kafka.snapshot.SnapshotReader; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; @@ -55,6 +56,7 @@ import java.util.stream.Collectors; import static java.util.Arrays.asList; import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1; import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2; +import static org.apache.kafka.server.common.MetadataVersion.IBP_3_5_IV0; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -251,6 +253,13 @@ public class MetadataLoaderTest { ) ); loader.handleLoadSnapshot(snapshotReader); + TestUtils.retryOnExceptionWithTimeout(30_000, () -> { + assertEquals(1L, loader.metrics().handleLoadSnapshotCount()); + }); + } else { + TestUtils.retryOnExceptionWithTimeout(30_000, () -> { + assertEquals(0L, loader.metrics().handleLoadSnapshotCount()); + }); } loader.waitForAllEventsToBeHandled(); if (sameObject) { @@ -328,6 +337,8 @@ public class MetadataLoaderTest { assertEquals(300L, loader.lastAppliedOffset()); assertEquals(new SnapshotManifest(new MetadataProvenance(300, 100, 4000), 3000000L), publishers.get(0).latestSnapshotManifest); + assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION, + loader.metrics().currentMetadataVersion()); } assertTrue(publishers.get(0).closed); assertEquals(MetadataVersion.IBP_3_0_IV1, @@ -587,14 +598,27 @@ public class MetadataLoaderTest { loadTestSnapshot(loader, 200); assertEquals(200L, loader.lastAppliedOffset()); + assertEquals(IBP_3_3_IV1.featureLevel(), + loader.metrics().currentMetadataVersion().featureLevel()); assertFalse(publishers.get(0).latestDelta.image().isEmpty()); loadTestSnapshot2(loader, 400); assertEquals(400L, loader.lastAppliedOffset()); + assertEquals(IBP_3_3_IV2.featureLevel(), + loader.metrics().currentMetadataVersion().featureLevel()); // Make sure the topic in the initial snapshot was overwritten by loading the new snapshot. assertFalse(publishers.get(0).latestImage.topics().topicsByName().containsKey("foo")); assertTrue(publishers.get(0).latestImage.topics().topicsByName().containsKey("bar")); + + loader.handleCommit(new MockBatchReader(500, asList( + MockBatchReader.newBatch(500, 100, asList( + new ApiMessageAndVersion(new FeatureLevelRecord(). + setName(MetadataVersion.FEATURE_NAME). + setFeatureLevel(IBP_3_5_IV0.featureLevel()), (short) 0)))))); + loader.waitForAllEventsToBeHandled(); + assertEquals(IBP_3_5_IV0.featureLevel(), + loader.metrics().currentMetadataVersion().featureLevel()); } faultHandler.maybeRethrowFirstException(); } diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java new file mode 100644 index 00000000000..c8d33754106 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image.loader.metrics; + +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.MetricName; +import com.yammer.metrics.core.MetricsRegistry; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.kafka.controller.metrics.ControllerMetricsTestUtils; +import org.apache.kafka.image.MetadataProvenance; +import org.junit.jupiter.api.Test; + +import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2; +import static org.junit.jupiter.api.Assertions.assertEquals; + + +public class MetadataLoaderMetricsTest { + private static class FakeMetadataLoaderMetrics implements AutoCloseable { + final AtomicLong batchProcessingTimeNs = new AtomicLong(0L); + final AtomicInteger batchSize = new AtomicInteger(0); + final AtomicReference<MetadataProvenance> provenance = + new AtomicReference<>(MetadataProvenance.EMPTY); + final MetadataLoaderMetrics metrics; + + FakeMetadataLoaderMetrics() { + this(Optional.empty()); + } + + FakeMetadataLoaderMetrics(MetricsRegistry registry) { + this(Optional.of(registry)); + } + + FakeMetadataLoaderMetrics(Optional<MetricsRegistry> registry) { + metrics = new MetadataLoaderMetrics( + registry, + n -> batchProcessingTimeNs.set(n), + n -> batchSize.set(n), + provenance); + } + + @Override + public void close() { + metrics.close(); + } + } + + @Test + public void testMetricNames() { + MetricsRegistry registry = new MetricsRegistry(); + try { + try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) { + ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server", + new HashSet<>(Arrays.asList( + "kafka.server:type=MetadataLoader,name=CurrentMetadataVersion", + "kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount" + ))); + } + ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server", + Collections.emptySet()); + } finally { + registry.shutdown(); + } + } + + @Test + public void testUpdateBatchProcessingTimeNs() { + MetricsRegistry registry = new MetricsRegistry(); + try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) { + fakeMetrics.metrics.updateBatchProcessingTimeNs(123L); + assertEquals(123L, fakeMetrics.batchProcessingTimeNs.get()); + } + } + + @Test + public void testUpdateBatchSize() { + MetricsRegistry registry = new MetricsRegistry(); + try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) { + fakeMetrics.metrics.updateBatchSize(50); + assertEquals(50, fakeMetrics.batchSize.get()); + } + } + + @Test + public void testUpdateLastAppliedImageProvenance() { + MetricsRegistry registry = new MetricsRegistry(); + try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) { + fakeMetrics.metrics.updateLastAppliedImageProvenance(new MetadataProvenance(1L, 2, 3L)); + assertEquals(new MetadataProvenance(1L, 2, 3L), fakeMetrics.provenance.get()); + } + } + + @Test + public void testManagedMetrics() { + MetricsRegistry registry = new MetricsRegistry(); + try { + try (FakeMetadataLoaderMetrics fakeMetrics = new FakeMetadataLoaderMetrics(registry)) { + fakeMetrics.metrics.setCurrentMetadataVersion(IBP_3_3_IV2); + fakeMetrics.metrics.incrementHandleLoadSnapshotCount(); + fakeMetrics.metrics.incrementHandleLoadSnapshotCount(); + + @SuppressWarnings("unchecked") + Gauge<Integer> currentMetadataVersion = (Gauge<Integer>) registry + .allMetrics() + .get(metricName("MetadataLoader", "CurrentMetadataVersion")); + assertEquals(IBP_3_3_IV2.featureLevel(), + currentMetadataVersion.value().shortValue()); + + @SuppressWarnings("unchecked") + Gauge<Long> loadSnapshotCount = (Gauge<Long>) registry + .allMetrics() + .get(metricName("MetadataLoader", "HandleLoadSnapshotCount")); + assertEquals(2L, loadSnapshotCount.value().longValue()); + } + ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, "kafka.server", + Collections.emptySet()); + } finally { + registry.shutdown(); + } + } + + private static MetricName metricName(String type, String name) { + String mBeanName = String.format("kafka.server:type=%s,name=%s", type, name); + return new MetricName("kafka.server", type, name, null, mBeanName); + } +}
