This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new e3212f28eb8 KAFKA-14457; Controller metrics should only expose
committed data (#12994)
e3212f28eb8 is described below
commit e3212f28eb88e8f7fcf3d2d4c646b2a28b0f668e
Author: José Armando García Sancio <[email protected]>
AuthorDate: Tue Dec 20 10:55:14 2022 -0800
KAFKA-14457; Controller metrics should only expose committed data (#12994)
The controller metrics in the controllers has three problems. 1) the active
controller exposes uncommitted data in the metrics. 2) the active controller
doesn't update the metrics when the uncommitted data gets aborted. 3) the
controller doesn't update the metrics when the entire state gets reset.
We fix these issues by only updating the metrics when processing committed
metadata records and reset the metrics when the metadata state is reset.
This change adds a new type `ControllerMetricsManager` which processes
committed metadata records and updates the metrics accordingly. This change
also removes metrics updating responsibilities from the rest of the controller
managers.
Reviewers: Ron Dagostino <[email protected]>
---
checkstyle/checkstyle.xml | 6 +
.../org/apache/kafka/controller/BrokersToIsrs.java | 17 -
.../kafka/controller/ClusterControlManager.java | 62 +---
.../apache/kafka/controller/ControllerMetrics.java | 4 +-
.../kafka/controller/ControllerMetricsManager.java | 318 ++++++++++++++++++
.../kafka/controller/PartitionChangeBuilder.java | 10 +-
.../apache/kafka/controller/QuorumController.java | 15 +-
.../kafka/controller/QuorumControllerMetrics.java | 8 +-
.../controller/ReplicationControlManager.java | 62 +---
.../apache/kafka/timeline/SnapshotRegistry.java | 2 +-
.../controller/ClusterControlManagerTest.java | 43 +--
.../controller/ControllerMetricsManagerTest.java | 364 +++++++++++++++++++++
.../kafka/controller/MockControllerMetrics.java | 4 +-
.../controller/ProducerIdControlManagerTest.java | 1 -
.../kafka/controller/QuorumControllerTest.java | 16 +-
.../controller/ReplicationControlManagerTest.java | 142 --------
16 files changed, 759 insertions(+), 315 deletions(-)
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index d0599f3d7a3..bf2d339da8c 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -143,9 +143,15 @@
<!-- default is 200 -->
<property name="max" value="500"/>
</module>
+
+ <!-- Allows the use of the @SuppressWarnings annotation in the code -->
+ <module name="SuppressWarningsHolder"/>
</module>
<module name="SuppressionFilter">
<property name="file" value="${suppressionsFile}"/>
</module>
+
+ <!-- Allows the use of the @SuppressWarnings annotation in the code -->
+ <module name="SuppressWarningsFilter"/>
</module>
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
index ec48cbc57a6..5f72a109736 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.Uuid;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
-import org.apache.kafka.timeline.TimelineInteger;
import java.util.Arrays;
import java.util.Collections;
@@ -105,12 +104,9 @@ public class BrokersToIsrs {
*/
private final TimelineHashMap<Integer, TimelineHashMap<Uuid, int[]>>
isrMembers;
- private final TimelineInteger offlinePartitionCount;
-
BrokersToIsrs(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
this.isrMembers = new TimelineHashMap<>(snapshotRegistry, 0);
- this.offlinePartitionCount = new TimelineInteger(snapshotRegistry);
}
/**
@@ -131,9 +127,6 @@ public class BrokersToIsrs {
} else {
if (prevLeader == NO_LEADER) {
prev = Replicas.copyWith(prevIsr, NO_LEADER);
- if (nextLeader != NO_LEADER) {
- offlinePartitionCount.decrement();
- }
} else {
prev = Replicas.clone(prevIsr);
}
@@ -145,9 +138,6 @@ public class BrokersToIsrs {
} else {
if (nextLeader == NO_LEADER) {
next = Replicas.copyWith(nextIsr, NO_LEADER);
- if (prevLeader != NO_LEADER) {
- offlinePartitionCount.increment();
- }
} else {
next = Replicas.clone(nextIsr);
}
@@ -191,9 +181,6 @@ public class BrokersToIsrs {
void removeTopicEntryForBroker(Uuid topicId, int brokerId) {
Map<Uuid, int[]> topicMap = isrMembers.get(brokerId);
if (topicMap != null) {
- if (brokerId == NO_LEADER && topicMap.containsKey(topicId)) {
- offlinePartitionCount.set(offlinePartitionCount.get() -
topicMap.get(topicId).length);
- }
topicMap.remove(topicId);
}
}
@@ -303,8 +290,4 @@ public class BrokersToIsrs {
boolean hasLeaderships(int brokerId) {
return iterator(brokerId, true).hasNext();
}
-
- int offlinePartitionCount() {
- return offlinePartitionCount.get();
- }
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 5bc5bd2c3af..920e7ef0c65 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -86,7 +86,6 @@ public class ClusterControlManager {
private SnapshotRegistry snapshotRegistry = null;
private long sessionTimeoutNs = DEFAULT_SESSION_TIMEOUT_NS;
private ReplicaPlacer replicaPlacer = null;
- private ControllerMetrics controllerMetrics = null;
private FeatureControlManager featureControl = null;
Builder setLogContext(LogContext logContext) {
@@ -119,11 +118,6 @@ public class ClusterControlManager {
return this;
}
- Builder setControllerMetrics(ControllerMetrics controllerMetrics) {
- this.controllerMetrics = controllerMetrics;
- return this;
- }
-
Builder setFeatureControlManager(FeatureControlManager featureControl)
{
this.featureControl = featureControl;
return this;
@@ -142,9 +136,6 @@ public class ClusterControlManager {
if (replicaPlacer == null) {
replicaPlacer = new StripedReplicaPlacer(new Random());
}
- if (controllerMetrics == null) {
- throw new RuntimeException("You must specify
ControllerMetrics");
- }
if (featureControl == null) {
throw new RuntimeException("You must specify
FeatureControlManager");
}
@@ -154,7 +145,6 @@ public class ClusterControlManager {
snapshotRegistry,
sessionTimeoutNs,
replicaPlacer,
- controllerMetrics,
featureControl
);
}
@@ -226,11 +216,6 @@ public class ClusterControlManager {
*/
private final TimelineHashMap<Integer, Long> registerBrokerRecordOffsets;
- /**
- * A reference to the controller's metrics registry.
- */
- private final ControllerMetrics controllerMetrics;
-
/**
* The broker heartbeat manager, or null if this controller is on standby.
*/
@@ -254,7 +239,6 @@ public class ClusterControlManager {
SnapshotRegistry snapshotRegistry,
long sessionTimeoutNs,
ReplicaPlacer replicaPlacer,
- ControllerMetrics metrics,
FeatureControlManager featureControl
) {
this.logContext = logContext;
@@ -267,7 +251,6 @@ public class ClusterControlManager {
this.registerBrokerRecordOffsets = new
TimelineHashMap<>(snapshotRegistry, 0);
this.heartbeatManager = null;
this.readyBrokersFuture = Optional.empty();
- this.controllerMetrics = metrics;
this.featureControl = featureControl;
}
@@ -397,8 +380,9 @@ public class ClusterControlManager {
}
public OptionalLong registerBrokerRecordOffset(int brokerId) {
- if (registerBrokerRecordOffsets.containsKey(brokerId)) {
- return OptionalLong.of(registerBrokerRecordOffsets.get(brokerId));
+ Long registrationOffset = registerBrokerRecordOffsets.get(brokerId);
+ if (registrationOffset != null) {
+ return OptionalLong.of(registrationOffset);
}
return OptionalLong.empty();
}
@@ -424,7 +408,6 @@ public class ClusterControlManager {
record.incarnationId(), listeners, features,
Optional.ofNullable(record.rack()), record.fenced(),
record.inControlledShutdown()));
- updateMetrics(prevRegistration, brokerRegistrations.get(brokerId));
if (heartbeatManager != null) {
if (prevRegistration != null) heartbeatManager.remove(brokerId);
heartbeatManager.register(brokerId, record.fenced());
@@ -451,7 +434,6 @@ public class ClusterControlManager {
} else {
if (heartbeatManager != null) heartbeatManager.remove(brokerId);
brokerRegistrations.remove(brokerId);
- updateMetrics(registration, brokerRegistrations.get(brokerId));
log.info("Unregistered broker: {}", record);
}
}
@@ -480,11 +462,11 @@ public class ClusterControlManager {
BrokerRegistrationFencingChange fencingChange =
BrokerRegistrationFencingChange.fromValue(record.fenced()).orElseThrow(
() -> new IllegalStateException(String.format("Unable to
replay %s: unknown " +
- "value for fenced field: %d", record, record.fenced())));
+ "value for fenced field: %x", record, record.fenced())));
BrokerRegistrationInControlledShutdownChange
inControlledShutdownChange =
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow(
() -> new IllegalStateException(String.format("Unable to
replay %s: unknown " +
- "value for inControlledShutdown field: %d", record,
record.inControlledShutdown())));
+ "value for inControlledShutdown field: %x", record,
record.inControlledShutdown())));
replayRegistrationChange(
record,
record.brokerId(),
@@ -515,7 +497,6 @@ public class ClusterControlManager {
);
if (!curRegistration.equals(nextRegistration)) {
brokerRegistrations.put(brokerId, nextRegistration);
- updateMetrics(curRegistration, nextRegistration);
} else {
log.info("Ignoring no-op registration change for {}",
curRegistration);
}
@@ -529,35 +510,6 @@ public class ClusterControlManager {
}
}
- private void updateMetrics(BrokerRegistration prevRegistration,
BrokerRegistration registration) {
- if (registration == null) {
- if (prevRegistration.fenced()) {
-
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() -
1);
- } else {
-
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() -
1);
- }
- log.info("Removed broker: {}", prevRegistration.id());
- } else if (prevRegistration == null) {
- if (registration.fenced()) {
-
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() +
1);
- log.info("Added new fenced broker: {}", registration.id());
- } else {
-
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() +
1);
- log.info("Added new unfenced broker: {}", registration.id());
- }
- } else {
- if (prevRegistration.fenced() && !registration.fenced()) {
-
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() -
1);
-
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() +
1);
- log.info("Unfenced broker: {}", registration.id());
- } else if (!prevRegistration.fenced() && registration.fenced()) {
-
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() +
1);
-
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() -
1);
- log.info("Fenced broker: {}", registration.id());
- }
- }
- }
-
Iterator<UsableBroker> usableBrokers() {
if (heartbeatManager == null) {
throw new RuntimeException("ClusterControlManager is not active.");
@@ -570,7 +522,7 @@ public class ClusterControlManager {
* Returns true if the broker is unfenced; Returns false if it is
* not or if it does not exist.
*/
- public boolean unfenced(int brokerId) {
+ public boolean isUnfenced(int brokerId) {
BrokerRegistration registration = brokerRegistrations.get(brokerId);
if (registration == null) return false;
return !registration.fenced();
@@ -600,7 +552,7 @@ public class ClusterControlManager {
* Returns true if the broker is active. Active means not fenced nor in
controlled
* shutdown; Returns false if it is not active or if it does not exist.
*/
- public boolean active(int brokerId) {
+ public boolean isActive(int brokerId) {
BrokerRegistration registration = brokerRegistrations.get(brokerId);
if (registration == null) return false;
return !registration.inControlledShutdown() && !registration.fenced();
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
index ff243aebfcb..baab5854004 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
@@ -35,9 +35,9 @@ public interface ControllerMetrics extends AutoCloseable {
int activeBrokerCount();
- void setGlobalTopicsCount(int topicCount);
+ void setGlobalTopicCount(int topicCount);
- int globalTopicsCount();
+ int globalTopicCount();
void setGlobalPartitionCount(int partitionCount);
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java
new file mode 100644
index 00000000000..d034f42cdd4
--- /dev/null
+++
b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+ private final static class PartitionState {
+ final int leader;
+ final int preferredReplica;
+
+ PartitionState(int leader, int preferredReplica) {
+ this.leader = leader;
+ this.preferredReplica = preferredReplica;
+ }
+
+ int leader() {
+ return leader;
+ }
+
+ int preferredReplica() {
+ return preferredReplica;
+ }
+ }
+
+ private final Set<Integer> registeredBrokers = new HashSet<>();
+
+ private final Set<Integer> fencedBrokers = new HashSet<>();
+
+ private int topicCount = 0;
+
+ private final Map<TopicIdPartition, PartitionState> topicPartitions = new
HashMap<>();
+
+ private final Set<TopicIdPartition> offlineTopicPartitions = new
HashSet<>();
+
+ private final Set<TopicIdPartition> imbalancedTopicPartitions = new
HashSet<>();
+
+ private final ControllerMetrics controllerMetrics;
+
+ ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+ this.controllerMetrics = controllerMetrics;
+ }
+
+ void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
+ int i = 1;
+ for (ApiMessageAndVersion message : messages) {
+ try {
+ replay(message.message());
+ } catch (Exception e) {
+ String failureMessage = String.format(
+ "Unable to update controller metrics for %s record, it was
%d of %d record(s) " +
+ "in the batch with baseOffset %d.",
+ message.message().getClass().getSimpleName(),
+ i,
+ messages.size(),
+ baseOffset
+ );
+ throw new IllegalArgumentException(failureMessage, e);
+ }
+ i++;
+ }
+ }
+
+ /**
+ * Update controller metrics by replaying a metadata record.
+ *
+ * This method assumes that the provided ApiMessage is one of the type
covered by MetadataRecordType.
+ *
+ * @param message a metadata record
+ */
+ @SuppressWarnings("checkstyle:cyclomaticComplexity")
+ void replay(ApiMessage message) {
+ MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+ switch (type) {
+ case REGISTER_BROKER_RECORD:
+ replay((RegisterBrokerRecord) message);
+ break;
+ case UNREGISTER_BROKER_RECORD:
+ replay((UnregisterBrokerRecord) message);
+ break;
+ case FENCE_BROKER_RECORD:
+ replay((FenceBrokerRecord) message);
+ break;
+ case UNFENCE_BROKER_RECORD:
+ replay((UnfenceBrokerRecord) message);
+ break;
+ case BROKER_REGISTRATION_CHANGE_RECORD:
+ replay((BrokerRegistrationChangeRecord) message);
+ break;
+ case TOPIC_RECORD:
+ replay((TopicRecord) message);
+ break;
+ case PARTITION_RECORD:
+ replay((PartitionRecord) message);
+ break;
+ case PARTITION_CHANGE_RECORD:
+ replay((PartitionChangeRecord) message);
+ break;
+ case REMOVE_TOPIC_RECORD:
+ replay((RemoveTopicRecord) message);
+ break;
+ case CONFIG_RECORD:
+ case FEATURE_LEVEL_RECORD:
+ case CLIENT_QUOTA_RECORD:
+ case PRODUCER_IDS_RECORD:
+ case ACCESS_CONTROL_ENTRY_RECORD:
+ case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+ case NO_OP_RECORD:
+ // These record types do not affect metrics
+ break;
+ default:
+ throw new RuntimeException("Unhandled record type " + type);
+ }
+ }
+
+ private void replay(RegisterBrokerRecord record) {
+ Integer brokerId = record.brokerId();
+ registeredBrokers.add(brokerId);
+ if (record.fenced()) {
+ fencedBrokers.add(brokerId);
+ } else {
+ fencedBrokers.remove(brokerId);
+ }
+
+ updateBrokerStateMetrics();
+ }
+
+ private void replay(UnregisterBrokerRecord record) {
+ Integer brokerId = record.brokerId();
+ registeredBrokers.remove(brokerId);
+ fencedBrokers.remove(brokerId);
+
+ updateBrokerStateMetrics();
+ }
+
+ private void replay(FenceBrokerRecord record) {
+ handleFencingChange(record.id(),
BrokerRegistrationFencingChange.FENCE);
+ }
+
+ private void replay(UnfenceBrokerRecord record) {
+ handleFencingChange(record.id(),
BrokerRegistrationFencingChange.UNFENCE);
+ }
+
+ private void replay(BrokerRegistrationChangeRecord record) {
+ BrokerRegistrationFencingChange fencingChange =
BrokerRegistrationFencingChange
+ .fromValue(record.fenced())
+ .orElseThrow(() -> {
+ return new IllegalArgumentException(
+ String.format(
+ "Registration change record for %d has unknown value
for fenced field: %x",
+ record.brokerId(),
+ record.fenced()
+ )
+ );
+ });
+
+ handleFencingChange(record.brokerId(), fencingChange);
+ }
+
+ private void handleFencingChange(Integer brokerId,
BrokerRegistrationFencingChange fencingChange) {
+ if (!registeredBrokers.contains(brokerId)) {
+ throw new IllegalArgumentException(String.format("Broker with id
%s is not registered", brokerId));
+ }
+
+ if (fencingChange == BrokerRegistrationFencingChange.FENCE) {
+ fencedBrokers.add(brokerId);
+ updateBrokerStateMetrics();
+ } else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) {
+ fencedBrokers.remove(brokerId);
+ updateBrokerStateMetrics();
+ } else {
+ // The fencingChange value is NONE. In this case the controller
doesn't need to update the broker
+ // state metrics.
+ }
+ }
+
+ private void updateBrokerStateMetrics() {
+ controllerMetrics.setFencedBrokerCount(fencedBrokers.size());
+
+ Set<Integer> activeBrokers = new HashSet<>(registeredBrokers);
+ activeBrokers.removeAll(fencedBrokers);
+ controllerMetrics.setActiveBrokerCount(activeBrokers.size());
+ }
+
+ private void replay(TopicRecord record) {
+ topicCount++;
+
+ controllerMetrics.setGlobalTopicCount(topicCount);
+ }
+
+ private void replay(PartitionRecord record) {
+ TopicIdPartition tp = new TopicIdPartition(record.topicId(),
record.partitionId());
+
+ PartitionState partitionState = new PartitionState(record.leader(),
record.replicas().get(0));
+ topicPartitions.put(tp, partitionState);
+
+ updateBasedOnPartitionState(tp, partitionState);
+
+ updateTopicAndPartitionMetrics();
+ }
+
+ private void replay(PartitionChangeRecord record) {
+ TopicIdPartition tp = new TopicIdPartition(record.topicId(),
record.partitionId());
+ if (!topicPartitions.containsKey(tp)) {
+ throw new IllegalArgumentException(String.format("Unknown topic
partitions %s", tp));
+ }
+
+ PartitionState partitionState = topicPartitions.computeIfPresent(
+ tp,
+ (key, oldValue) -> {
+ PartitionState newValue = oldValue;
+ // Update replicas
+ if (record.replicas() != null) {
+ newValue = new PartitionState(newValue.leader(),
record.replicas().get(0));
+ }
+
+ if (record.leader() != NO_LEADER_CHANGE) {
+ newValue = new PartitionState(record.leader(),
newValue.preferredReplica());
+ }
+
+ return newValue;
+ }
+ );
+
+ updateBasedOnPartitionState(tp, partitionState);
+
+ updateTopicAndPartitionMetrics();
+ }
+
+ private void replay(RemoveTopicRecord record) {
+ Uuid topicId = record.topicId();
+ Predicate<TopicIdPartition> matchesTopic = tp -> tp.topicId() ==
topicId;
+
+ topicCount--;
+ topicPartitions.keySet().removeIf(matchesTopic);
+ offlineTopicPartitions.removeIf(matchesTopic);
+ imbalancedTopicPartitions.removeIf(matchesTopic);
+
+ updateTopicAndPartitionMetrics();
+ }
+
+ private void updateBasedOnPartitionState(TopicIdPartition tp,
PartitionState partitionState) {
+ if (partitionState.leader() == NO_LEADER) {
+ offlineTopicPartitions.add(tp);
+ } else {
+ offlineTopicPartitions.remove(tp);
+ }
+
+ if (partitionState.leader() == partitionState.preferredReplica()) {
+ imbalancedTopicPartitions.remove(tp);
+ } else {
+ imbalancedTopicPartitions.add(tp);
+ }
+ }
+
+ private void updateTopicAndPartitionMetrics() {
+ controllerMetrics.setGlobalTopicCount(topicCount);
+ controllerMetrics.setGlobalPartitionCount(topicPartitions.size());
+
controllerMetrics.setOfflinePartitionCount(offlineTopicPartitions.size());
+
controllerMetrics.setPreferredReplicaImbalanceCount(imbalancedTopicPartitions.size());
+ }
+
+ /**
+ * Resets the value of all of the metrics.
+ *
+ * Resets all of the state tracked by this type and resets all of the
related controller metrics.
+ */
+ void reset() {
+ registeredBrokers.clear();
+ fencedBrokers.clear();
+ topicCount = 0;
+ topicPartitions.clear();
+ offlineTopicPartitions.clear();
+ imbalancedTopicPartitions.clear();
+
+ updateBrokerStateMetrics();
+ updateTopicAndPartitionMetrics();
+ }
+}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index cdd6d4416f1..e35bf7ef62b 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -30,7 +30,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import java.util.function.Function;
+import java.util.function.IntPredicate;
import static
org.apache.kafka.common.metadata.MetadataRecordType.PARTITION_CHANGE_RECORD;
import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
@@ -73,7 +73,7 @@ public class PartitionChangeBuilder {
private final PartitionRegistration partition;
private final Uuid topicId;
private final int partitionId;
- private final Function<Integer, Boolean> isAcceptableLeader;
+ private final IntPredicate isAcceptableLeader;
private final boolean isLeaderRecoverySupported;
private List<Integer> targetIsr;
private List<Integer> targetReplicas;
@@ -85,7 +85,7 @@ public class PartitionChangeBuilder {
public PartitionChangeBuilder(PartitionRegistration partition,
Uuid topicId,
int partitionId,
- Function<Integer, Boolean>
isAcceptableLeader,
+ IntPredicate isAcceptableLeader,
boolean isLeaderRecoverySupported) {
this.partition = partition;
this.topicId = topicId;
@@ -198,7 +198,7 @@ public class PartitionChangeBuilder {
if (election == Election.UNCLEAN) {
// Attempt unclean leader election
Optional<Integer> uncleanLeader = targetReplicas.stream()
- .filter(replica -> isAcceptableLeader.apply(replica))
+ .filter(replica -> isAcceptableLeader.test(replica))
.findFirst();
if (uncleanLeader.isPresent()) {
return new ElectionResult(uncleanLeader.get(), true);
@@ -209,7 +209,7 @@ public class PartitionChangeBuilder {
}
private boolean isValidNewLeader(int replica) {
- return targetIsr.contains(replica) &&
isAcceptableLeader.apply(replica);
+ return targetIsr.contains(replica) && isAcceptableLeader.test(replica);
}
private void tryElection(PartitionChangeRecord record) {
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index ef758c71598..55e80687219 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -979,6 +979,9 @@ public final class QuorumController implements Controller {
i++;
}
}
+
+
controllerMetricsManager.replayBatch(batch.baseOffset(), messages);
+
updateLastCommittedState(
offset,
epoch,
@@ -1028,6 +1031,7 @@ public final class QuorumController implements Controller
{
for (ApiMessageAndVersion message : messages) {
try {
replay(message.message(),
Optional.of(reader.snapshotId()), reader.lastContainedLogOffset());
+
controllerMetricsManager.replay(message.message());
} catch (Throwable e) {
String failureMessage = String.format("Unable
to apply %s record " +
"from snapshot %s on standby
controller, which was %d of " +
@@ -1039,6 +1043,7 @@ public final class QuorumController implements Controller
{
i++;
}
}
+
updateLastCommittedState(
reader.lastContainedLogOffset(),
reader.lastContainedLogEpoch(),
@@ -1482,6 +1487,7 @@ public final class QuorumController implements Controller
{
private void resetToEmptyState() {
snapshotGeneratorManager.cancel();
snapshotRegistry.reset();
+ controllerMetricsManager.reset();
updateLastCommittedState(-1, -1, -1, 0);
}
@@ -1527,6 +1533,12 @@ public final class QuorumController implements
Controller {
*/
private final ControllerMetrics controllerMetrics;
+
+ /**
+ * Manager for updating controller metrics based on the committed metadata.
+ */
+ private final ControllerMetricsManager controllerMetricsManager;
+
/**
* A registry for snapshot data. This must be accessed only by the event
queue thread.
*/
@@ -1731,6 +1743,7 @@ public final class QuorumController implements Controller
{
this.queue = queue;
this.time = time;
this.controllerMetrics = controllerMetrics;
+ this.controllerMetricsManager = new
ControllerMetricsManager(controllerMetrics);
this.snapshotRegistry = new SnapshotRegistry(logContext);
this.purgatory = new ControllerPurgatory();
this.resourceExists = new ConfigResourceExistenceChecker();
@@ -1763,7 +1776,6 @@ public final class QuorumController implements Controller
{
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(sessionTimeoutNs).
setReplicaPlacer(replicaPlacer).
- setControllerMetrics(controllerMetrics).
setFeatureControlManager(featureControl).
build();
this.producerIdControlManager = new
ProducerIdControlManager(clusterControl, snapshotRegistry);
@@ -1778,7 +1790,6 @@ public final class QuorumController implements Controller
{
setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
setConfigurationControl(configurationControl).
setClusterControl(clusterControl).
- setControllerMetrics(controllerMetrics).
setCreateTopicPolicy(createTopicPolicy).
setFeatureControl(featureControl).
build();
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
index b96a687b0f3..00413c4bd53 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
@@ -49,7 +49,7 @@ public final class QuorumControllerMetrics implements
ControllerMetrics {
private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT =
getMetricName(
"KafkaController", "PreferredReplicaImbalanceCount");
private final static MetricName METADATA_ERROR_COUNT = getMetricName(
- "KafkaController", "MetadataErrorCount");
+ "KafkaController", "MetadataErrorCount");
private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private final static MetricName LAST_COMMITTED_RECORD_OFFSET =
getMetricName(
@@ -67,7 +67,7 @@ public final class QuorumControllerMetrics implements
ControllerMetrics {
private volatile int globalPartitionCount;
private volatile int offlinePartitionCount;
private volatile int preferredReplicaImbalanceCount;
- private volatile AtomicInteger metadataErrorCount;
+ private final AtomicInteger metadataErrorCount;
private final AtomicLong lastAppliedRecordOffset = new AtomicLong(0);
private final AtomicLong lastCommittedRecordOffset = new AtomicLong(0);
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
@@ -215,12 +215,12 @@ public final class QuorumControllerMetrics implements
ControllerMetrics {
}
@Override
- public void setGlobalTopicsCount(int topicCount) {
+ public void setGlobalTopicCount(int topicCount) {
this.globalTopicCount = topicCount;
}
@Override
- public int globalTopicsCount() {
+ public int globalTopicCount() {
return this.globalTopicCount;
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 1fc713c207f..b84be096c99 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -92,7 +92,6 @@ import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
-import org.apache.kafka.timeline.TimelineInteger;
import org.slf4j.Logger;
import java.util.AbstractMap.SimpleImmutableEntry;
@@ -110,7 +109,7 @@ import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
-import java.util.function.Function;
+import java.util.function.IntPredicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -151,7 +150,6 @@ public class ReplicationControlManager {
private int maxElectionsPerImbalance = MAX_ELECTIONS_PER_IMBALANCE;
private ConfigurationControlManager configurationControl = null;
private ClusterControlManager clusterControl = null;
- private ControllerMetrics controllerMetrics = null;
private Optional<CreateTopicPolicy> createTopicPolicy =
Optional.empty();
private FeatureControlManager featureControl = null;
@@ -190,11 +188,6 @@ public class ReplicationControlManager {
return this;
}
- Builder setControllerMetrics(ControllerMetrics controllerMetrics) {
- this.controllerMetrics = controllerMetrics;
- return this;
- }
-
Builder setCreateTopicPolicy(Optional<CreateTopicPolicy>
createTopicPolicy) {
this.createTopicPolicy = createTopicPolicy;
return this;
@@ -210,8 +203,6 @@ public class ReplicationControlManager {
throw new IllegalStateException("Configuration control must be
set before building");
} else if (clusterControl == null) {
throw new IllegalStateException("Cluster controller must be
set before building");
- } else if (controllerMetrics == null) {
- throw new IllegalStateException("Metrics must be set before
building");
}
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry =
configurationControl.snapshotRegistry();
@@ -232,7 +223,6 @@ public class ReplicationControlManager {
maxElectionsPerImbalance,
configurationControl,
clusterControl,
- controllerMetrics,
createTopicPolicy,
featureControl);
}
@@ -294,11 +284,6 @@ public class ReplicationControlManager {
*/
private final int maxElectionsPerImbalance;
- /**
- * A count of the total number of partitions in the cluster.
- */
- private final TimelineInteger globalPartitionCount;
-
/**
* A reference to the controller's configuration control manager.
*/
@@ -309,11 +294,6 @@ public class ReplicationControlManager {
*/
private final ClusterControlManager clusterControl;
- /**
- * A reference to the controller's metrics registry.
- */
- private final ControllerMetrics controllerMetrics;
-
/**
* The policy to use to validate that topic assignments are valid, if one
is present.
*/
@@ -379,7 +359,6 @@ public class ReplicationControlManager {
int maxElectionsPerImbalance,
ConfigurationControlManager configurationControl,
ClusterControlManager clusterControl,
- ControllerMetrics controllerMetrics,
Optional<CreateTopicPolicy> createTopicPolicy,
FeatureControlManager featureControl
) {
@@ -389,11 +368,9 @@ public class ReplicationControlManager {
this.defaultNumPartitions = defaultNumPartitions;
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
this.configurationControl = configurationControl;
- this.controllerMetrics = controllerMetrics;
this.createTopicPolicy = createTopicPolicy;
this.featureControl = featureControl;
this.clusterControl = clusterControl;
- this.globalPartitionCount = new TimelineInteger(snapshotRegistry);
this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
this.topicsWithCollisionChars = new
TimelineHashMap<>(snapshotRegistry, 0);
this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
@@ -415,7 +392,6 @@ public class ReplicationControlManager {
}
topics.put(record.topicId(),
new TopicControlInfo(record.name(), snapshotRegistry,
record.topicId()));
- controllerMetrics.setGlobalTopicsCount(topics.size());
log.info("Created topic {} with topic ID {}.", record.name(),
record.topicId());
}
@@ -434,8 +410,6 @@ public class ReplicationControlManager {
topicInfo.parts.put(record.partitionId(), newPartInfo);
brokersToIsrs.update(record.topicId(), record.partitionId(), null,
newPartInfo.isr, NO_LEADER, newPartInfo.leader);
- globalPartitionCount.increment();
-
controllerMetrics.setGlobalPartitionCount(globalPartitionCount.get());
updateReassigningTopicsIfNeeded(record.topicId(),
record.partitionId(),
false, newPartInfo.isReassigning());
} else if (!newPartInfo.equals(prevPartInfo)) {
@@ -452,9 +426,6 @@ public class ReplicationControlManager {
} else {
imbalancedPartitions.add(new TopicIdPartition(record.topicId(),
record.partitionId()));
}
-
-
controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
-
controllerMetrics.setPreferredReplicaImbalanceCount(imbalancedPartitions.size());
}
private void updateReassigningTopicsIfNeeded(Uuid topicId, int partitionId,
@@ -503,9 +474,6 @@ public class ReplicationControlManager {
imbalancedPartitions.add(new TopicIdPartition(record.topicId(),
record.partitionId()));
}
-
controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
-
controllerMetrics.setPreferredReplicaImbalanceCount(imbalancedPartitions.size());
-
if (record.removingReplicas() != null || record.addingReplicas() !=
null) {
log.info("Replayed partition assignment change {} for topic {}",
record, topicInfo.name);
} else if (log.isTraceEnabled()) {
@@ -546,15 +514,9 @@ public class ReplicationControlManager {
}
imbalancedPartitions.remove(new TopicIdPartition(record.topicId(),
partitionId));
-
- globalPartitionCount.decrement();
}
brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);
- controllerMetrics.setGlobalTopicsCount(topics.size());
- controllerMetrics.setGlobalPartitionCount(globalPartitionCount.get());
-
controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
-
controllerMetrics.setPreferredReplicaImbalanceCount(imbalancedPartitions.size());
log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
}
@@ -657,7 +619,7 @@ public class ReplicationControlManager {
validateManualPartitionAssignment(assignment.brokerIds(),
replicationFactor);
replicationFactor =
OptionalInt.of(assignment.brokerIds().size());
List<Integer> isr = assignment.brokerIds().stream().
-
filter(clusterControl::active).collect(Collectors.toList());
+
filter(clusterControl::isActive).collect(Collectors.toList());
if (isr.isEmpty()) {
return new ApiError(Errors.INVALID_REPLICA_ASSIGNMENT,
"All brokers specified in the manual partition
assignment for " +
@@ -701,7 +663,7 @@ public class ReplicationControlManager {
for (int partitionId = 0; partitionId < partitions.size();
partitionId++) {
List<Integer> replicas = partitions.get(partitionId);
List<Integer> isr = replicas.stream().
-
filter(clusterControl::active).collect(Collectors.toList());
+
filter(clusterControl::isActive).collect(Collectors.toList());
// If the ISR is empty, it means that all brokers are
fenced or
// in controlled shutdown. To be consistent with the
replica placer,
// we reject the create topic request with
INVALID_REPLICATION_FACTOR.
@@ -980,7 +942,7 @@ public class ReplicationControlManager {
partition,
topic.id,
partitionId,
- clusterControl::active,
+ clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name())) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
@@ -1356,7 +1318,7 @@ public class ReplicationControlManager {
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicId,
partitionId,
- clusterControl::active,
+ clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
builder.setElection(election);
Optional<ApiMessageAndVersion> record = builder.build();
@@ -1471,7 +1433,7 @@ public class ReplicationControlManager {
partition,
topicPartition.topicId(),
topicPartition.partitionId(),
- clusterControl::active,
+ clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported()
);
builder.setElection(PartitionChangeBuilder.Election.PREFERRED);
@@ -1554,7 +1516,7 @@ public class ReplicationControlManager {
OptionalInt.of(replicationFactor));
placements.add(assignment.brokerIds());
List<Integer> isr = assignment.brokerIds().stream().
-
filter(clusterControl::active).collect(Collectors.toList());
+
filter(clusterControl::isActive).collect(Collectors.toList());
if (isr.isEmpty()) {
throw new InvalidReplicaAssignmentException(
"All brokers specified in the manual partition
assignment for " +
@@ -1574,7 +1536,7 @@ public class ReplicationControlManager {
for (int i = 0; i < placements.size(); i++) {
List<Integer> replicas = placements.get(i);
List<Integer> isr = isrs.get(i).stream().
- filter(clusterControl::active).collect(Collectors.toList());
+ filter(clusterControl::isActive).collect(Collectors.toList());
// If the ISR is empty, it means that all brokers are fenced or
// in controlled shutdown. To be consistent with the replica
placer,
// we reject the create topic request with
INVALID_REPLICATION_FACTOR.
@@ -1660,8 +1622,8 @@ public class ReplicationControlManager {
// from the target ISR, but we need to exclude it here too, to handle
the case
// where there is an unclean leader election which chooses a leader
from outside
// the ISR.
- Function<Integer, Boolean> isAcceptableLeader =
- r -> (r != brokerToRemove) && (r == brokerToAdd ||
clusterControl.active(r));
+ IntPredicate isAcceptableLeader =
+ r -> (r != brokerToRemove) && (r == brokerToAdd ||
clusterControl.isActive(r));
while (iterator.hasNext()) {
TopicIdPartition topicIdPart = iterator.next();
@@ -1788,7 +1750,7 @@ public class ReplicationControlManager {
PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
tp.topicId(),
tp.partitionId(),
- clusterControl::active,
+ clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
@@ -1840,7 +1802,7 @@ public class ReplicationControlManager {
PartitionChangeBuilder builder = new PartitionChangeBuilder(part,
tp.topicId(),
tp.partitionId(),
- clusterControl::active,
+ clusterControl::isActive,
featureControl.metadataVersion().isLeaderRecoverySupported());
if (!reassignment.merged().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.merged());
diff --git
a/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
b/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
index 0df9c8b8f56..25177d45d71 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
@@ -264,7 +264,7 @@ public class SnapshotRegistry {
}
/**
- * Associate with this registry.
+ * Associate a revertable with this registry.
*/
public void register(Revertable revertable) {
revertables.add(revertable);
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index ceb12cc6254..55bb6df9568 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -86,11 +86,10 @@ public class ClusterControlManagerTest {
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
- assertFalse(clusterControl.unfenced(0));
+ assertFalse(clusterControl.isUnfenced(0));
RegisterBrokerRecord brokerRecord = new
RegisterBrokerRecord().setBrokerEpoch(100).setBrokerId(1);
brokerRecord.endPoints().add(new BrokerEndpoint().
@@ -104,8 +103,8 @@ public class ClusterControlManagerTest {
() -> clusterControl.checkBrokerEpoch(1, 101));
assertThrows(StaleBrokerEpochException.class,
() -> clusterControl.checkBrokerEpoch(2, 100));
- assertFalse(clusterControl.unfenced(0));
- assertFalse(clusterControl.unfenced(1));
+ assertFalse(clusterControl.isUnfenced(0));
+ assertFalse(clusterControl.isUnfenced(1));
if (metadataVersion.isLessThan(IBP_3_3_IV2)) {
UnfenceBrokerRecord unfenceBrokerRecord =
@@ -116,8 +115,8 @@ public class ClusterControlManagerTest {
new
BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(100).setFenced(BrokerRegistrationFencingChange.UNFENCE.value());
clusterControl.replay(changeRecord);
}
- assertFalse(clusterControl.unfenced(0));
- assertTrue(clusterControl.unfenced(1));
+ assertFalse(clusterControl.isUnfenced(0));
+ assertTrue(clusterControl.isUnfenced(1));
if (metadataVersion.isLessThan(IBP_3_3_IV2)) {
FenceBrokerRecord fenceBrokerRecord =
@@ -128,8 +127,8 @@ public class ClusterControlManagerTest {
new
BrokerRegistrationChangeRecord().setBrokerId(1).setBrokerEpoch(100).setFenced(BrokerRegistrationFencingChange.FENCE.value());
clusterControl.replay(changeRecord);
}
- assertFalse(clusterControl.unfenced(0));
- assertFalse(clusterControl.unfenced(1));
+ assertFalse(clusterControl.isUnfenced(0));
+ assertFalse(clusterControl.isUnfenced(1));
}
@Test
@@ -149,11 +148,10 @@ public class ClusterControlManagerTest {
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
- assertFalse(clusterControl.unfenced(0));
+ assertFalse(clusterControl.isUnfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
@@ -169,20 +167,20 @@ public class ClusterControlManagerTest {
setHost("example.com"));
clusterControl.replay(brokerRecord, 100L);
- assertFalse(clusterControl.unfenced(0));
+ assertFalse(clusterControl.isUnfenced(0));
assertTrue(clusterControl.inControlledShutdown(0));
brokerRecord.setInControlledShutdown(false);
clusterControl.replay(brokerRecord, 100L);
- assertFalse(clusterControl.unfenced(0));
+ assertFalse(clusterControl.isUnfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
assertEquals(100L,
clusterControl.registerBrokerRecordOffset(brokerRecord.brokerId()).getAsLong());
brokerRecord.setFenced(false);
clusterControl.replay(brokerRecord, 100L);
- assertTrue(clusterControl.unfenced(0));
+ assertTrue(clusterControl.isUnfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
}
@@ -203,11 +201,10 @@ public class ClusterControlManagerTest {
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
- assertFalse(clusterControl.unfenced(0));
+ assertFalse(clusterControl.isUnfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
@@ -222,7 +219,7 @@ public class ClusterControlManagerTest {
setHost("example.com"));
clusterControl.replay(brokerRecord, 100L);
- assertTrue(clusterControl.unfenced(0));
+ assertTrue(clusterControl.isUnfenced(0));
assertFalse(clusterControl.inControlledShutdown(0));
BrokerRegistrationChangeRecord registrationChangeRecord = new
BrokerRegistrationChangeRecord()
@@ -231,7 +228,7 @@ public class ClusterControlManagerTest {
.setInControlledShutdown(BrokerRegistrationInControlledShutdownChange.IN_CONTROLLED_SHUTDOWN.value());
clusterControl.replay(registrationChangeRecord);
- assertTrue(clusterControl.unfenced(0));
+ assertTrue(clusterControl.isUnfenced(0));
assertTrue(clusterControl.inControlledShutdown(0));
registrationChangeRecord = new BrokerRegistrationChangeRecord()
@@ -240,7 +237,7 @@ public class ClusterControlManagerTest {
.setFenced(BrokerRegistrationFencingChange.UNFENCE.value());
clusterControl.replay(registrationChangeRecord);
- assertTrue(clusterControl.unfenced(0));
+ assertTrue(clusterControl.isUnfenced(0));
assertTrue(clusterControl.inControlledShutdown(0));
}
@@ -259,7 +256,6 @@ public class ClusterControlManagerTest {
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
@@ -289,7 +285,6 @@ public class ClusterControlManagerTest {
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
@@ -345,7 +340,6 @@ public class ClusterControlManagerTest {
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
@@ -380,7 +374,6 @@ public class ClusterControlManagerTest {
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
@@ -399,7 +392,7 @@ public class ClusterControlManagerTest {
clusterControl.heartbeatManager().touch(i, false, 0);
}
for (int i = 0; i < numUsableBrokers; i++) {
- assertTrue(clusterControl.unfenced(i),
+ assertTrue(clusterControl.isUnfenced(i),
String.format("broker %d was not unfenced.", i));
}
for (int i = 0; i < 100; i++) {
@@ -439,11 +432,10 @@ public class ClusterControlManagerTest {
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
- assertFalse(clusterControl.unfenced(0));
+ assertFalse(clusterControl.isUnfenced(0));
for (int i = 0; i < 3; i++) {
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
setBrokerEpoch(100).setBrokerId(i).setRack(null);
@@ -513,7 +505,6 @@ public class ClusterControlManagerTest {
setClusterId("fPZv1VBsRFmnlRvmGcOW9w").
setTime(new MockTime(0, 0, 0)).
setSnapshotRegistry(snapshotRegistry).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
clusterControl.activate();
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ControllerMetricsManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ControllerMetricsManagerTest.java
new file mode 100644
index 00000000000..8cfeea417be
--- /dev/null
+++
b/metadata/src/test/java/org/apache/kafka/controller/ControllerMetricsManagerTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.junit.jupiter.api.Test;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+final class ControllerMetricsManagerTest {
+ @Test
+ public void testActiveBrokerRegistration() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, false));
+ assertEquals(1, metrics.activeBrokerCount());
+ assertEquals(0, metrics.fencedBrokerCount());
+ }
+
+ @Test
+ public void testFenceBrokerRegistration() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, true));
+ assertEquals(0, metrics.activeBrokerCount());
+ assertEquals(1, metrics.fencedBrokerCount());
+ }
+
+ @Test
+ public void testBrokerChangedToActive() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, true));
+ manager.replay(brokerChange(1, 1,
BrokerRegistrationFencingChange.UNFENCE));
+ assertEquals(1, metrics.activeBrokerCount());
+ assertEquals(0, metrics.fencedBrokerCount());
+ }
+
+ @Test
+ public void testBrokerLegacyChangedToActive() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, true));
+ manager.replay(brokerUnfence(1, 1));
+ assertEquals(1, metrics.activeBrokerCount());
+ assertEquals(0, metrics.fencedBrokerCount());
+ }
+
+ @Test
+ public void testBrokerChangedToFence() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, false));
+ manager.replay(brokerChange(1, 1,
BrokerRegistrationFencingChange.FENCE));
+ assertEquals(0, metrics.activeBrokerCount());
+ assertEquals(1, metrics.fencedBrokerCount());
+ }
+
+
+ @Test
+ public void testBrokerLegacyChangedToFence() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, false));
+ manager.replay(brokerFence(1, 1));
+ assertEquals(0, metrics.activeBrokerCount());
+ assertEquals(1, metrics.fencedBrokerCount());
+ }
+
+ @Test
+ public void testBrokerUnchanged() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, true));
+ manager.replay(brokerChange(1, 1,
BrokerRegistrationFencingChange.NONE));
+ assertEquals(0, metrics.activeBrokerCount());
+ assertEquals(1, metrics.fencedBrokerCount());
+ }
+
+ @Test
+ public void testBrokerUnregister() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, true));
+ manager.replay(brokerRegistration(2, 1, false));
+ assertEquals(1, metrics.activeBrokerCount());
+ assertEquals(1, metrics.fencedBrokerCount());
+ manager.replay(brokerUnregistration(1, 1));
+ assertEquals(1, metrics.activeBrokerCount());
+ assertEquals(0, metrics.fencedBrokerCount());
+ manager.replay(brokerUnregistration(2, 1));
+ assertEquals(0, metrics.activeBrokerCount());
+ assertEquals(0, metrics.fencedBrokerCount());
+ }
+
+ @Test
+ public void testReplayBatch() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ manager.replayBatch(
+ 0,
+ Arrays.asList(
+ new ApiMessageAndVersion(brokerRegistration(1, 1, true),
(short) 0),
+ new ApiMessageAndVersion(brokerChange(1, 1,
BrokerRegistrationFencingChange.UNFENCE), (short) 0)
+ )
+ );
+ assertEquals(1, metrics.activeBrokerCount());
+ assertEquals(0, metrics.fencedBrokerCount());
+ }
+
+ @Test
+ public void testTopicCountIncreased() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ manager.replay(topicRecord("test"));
+ assertEquals(1, metrics.globalTopicCount());
+ }
+
+ @Test
+ public void testTopicCountDecreased() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ Uuid id = Uuid.randomUuid();
+ manager.replay(topicRecord("test", id));
+ manager.replay(removeTopicRecord(id));
+ assertEquals(0, metrics.globalTopicCount());
+ }
+
+ @Test
+ public void testPartitionCountIncreased() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ Uuid id = Uuid.randomUuid();
+ manager.replay(topicRecord("test", id));
+ assertEquals(0, metrics.globalPartitionCount());
+ manager.replay(partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
+ assertEquals(1, metrics.globalPartitionCount());
+ manager.replay(partitionRecord(id, 1, 0, Arrays.asList(0, 1, 2)));
+ assertEquals(2, metrics.globalPartitionCount());
+ }
+
+ @Test
+ public void testPartitionCountDecreased() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ Uuid id = Uuid.randomUuid();
+ manager.replay(topicRecord("test", id));
+ manager.replay(partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
+ manager.replay(partitionRecord(id, 1, 0, Arrays.asList(0, 1, 2)));
+ manager.replay(removeTopicRecord(id));
+ assertEquals(0, metrics.globalPartitionCount());
+ }
+
+ @Test
+ public void testOfflinePartition() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ Uuid id = Uuid.randomUuid();
+ manager.replay(topicRecord("test", id));
+ manager.replay(partitionRecord(id, 0, NO_LEADER, Arrays.asList(0, 1,
2)));
+ assertEquals(1, metrics.offlinePartitionCount());
+ }
+
+ @Test
+ public void testImbalancedPartition() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ Uuid id = Uuid.randomUuid();
+ manager.replay(topicRecord("test", id));
+ manager.replay(partitionRecord(id, 0, 1, Arrays.asList(0, 1, 2)));
+ assertEquals(1, metrics.preferredReplicaImbalanceCount());
+ }
+
+ @Test
+ public void testPartitionChange() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ Uuid id = Uuid.randomUuid();
+ manager.replay(topicRecord("test", id));
+ manager.replay(partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
+
+ manager.replay(partitionChangeRecord(id, 0, OptionalInt.of(NO_LEADER),
Optional.empty()));
+ assertEquals(1, metrics.offlinePartitionCount());
+
+ manager.replay(partitionChangeRecord(id, 0, OptionalInt.of(1),
Optional.empty()));
+ assertEquals(0, metrics.offlinePartitionCount());
+ assertEquals(1, metrics.preferredReplicaImbalanceCount());
+
+ manager.replay(partitionChangeRecord(id, 0, OptionalInt.of(0),
Optional.empty()));
+ assertEquals(0, metrics.preferredReplicaImbalanceCount());
+
+ manager.replay(partitionChangeRecord(id, 0, OptionalInt.empty(),
Optional.of(Arrays.asList(1, 2, 0))));
+ assertEquals(1, metrics.preferredReplicaImbalanceCount());
+
+ manager.replay(partitionChangeRecord(id, 0, OptionalInt.of(2),
Optional.of(Arrays.asList(2, 0, 1))));
+ assertEquals(0, metrics.preferredReplicaImbalanceCount());
+ }
+
+ @Test
+ public void testStartingMetrics() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ assertEquals(0, metrics.activeBrokerCount());
+ assertEquals(0, metrics.fencedBrokerCount());
+ assertEquals(0, metrics.globalTopicCount());
+ assertEquals(0, metrics.globalPartitionCount());
+ assertEquals(0, metrics.offlinePartitionCount());
+ assertEquals(0, metrics.preferredReplicaImbalanceCount());
+ }
+
+ @Test
+ public void testReset() {
+ ControllerMetrics metrics = new MockControllerMetrics();
+ ControllerMetricsManager manager = new
ControllerMetricsManager(metrics);
+
+ manager.replay(brokerRegistration(1, 1, true));
+
+ Uuid id = Uuid.randomUuid();
+ manager.replay(topicRecord("test", id));
+ manager.replay(partitionRecord(id, 0, 0, Arrays.asList(0, 1, 2)));
+
+ manager.reset();
+
+ assertEquals(0, metrics.activeBrokerCount());
+ assertEquals(0, metrics.fencedBrokerCount());
+ assertEquals(0, metrics.globalTopicCount());
+ assertEquals(0, metrics.globalPartitionCount());
+ assertEquals(0, metrics.offlinePartitionCount());
+ assertEquals(0, metrics.preferredReplicaImbalanceCount());
+ }
+
+ private static RegisterBrokerRecord brokerRegistration(
+ int brokerId,
+ long epoch,
+ boolean fenced
+ ) {
+ return new RegisterBrokerRecord()
+ .setBrokerId(brokerId)
+ .setIncarnationId(Uuid.randomUuid())
+ .setBrokerEpoch(epoch)
+ .setFenced(fenced);
+ }
+
+ private static UnregisterBrokerRecord brokerUnregistration(
+ int brokerId,
+ long epoch
+ ) {
+ return new UnregisterBrokerRecord()
+ .setBrokerId(brokerId)
+ .setBrokerEpoch(epoch);
+ }
+
+ private static BrokerRegistrationChangeRecord brokerChange(
+ int brokerId,
+ long epoch,
+ BrokerRegistrationFencingChange fencing
+ ) {
+ return new BrokerRegistrationChangeRecord()
+ .setBrokerId(brokerId)
+ .setBrokerEpoch(epoch)
+ .setFenced(fencing.value());
+ }
+
+ private static UnfenceBrokerRecord brokerUnfence(int brokerId, long epoch)
{
+ return new UnfenceBrokerRecord()
+ .setId(brokerId)
+ .setEpoch(epoch);
+ }
+
+ private static FenceBrokerRecord brokerFence(int brokerId, long epoch) {
+ return new FenceBrokerRecord()
+ .setId(brokerId)
+ .setEpoch(epoch);
+ }
+
+ private static TopicRecord topicRecord(String name) {
+ return new TopicRecord().setName(name).setTopicId(Uuid.randomUuid());
+ }
+
+ private static TopicRecord topicRecord(String name, Uuid id) {
+ return new TopicRecord().setName(name).setTopicId(id);
+ }
+
+ private static RemoveTopicRecord removeTopicRecord(Uuid id) {
+ return new RemoveTopicRecord().setTopicId(id);
+ }
+
+ private static PartitionRecord partitionRecord(
+ Uuid id,
+ int partition,
+ int leader,
+ List<Integer> replicas
+ ) {
+ return new PartitionRecord()
+ .setPartitionId(partition)
+ .setTopicId(id)
+ .setReplicas(replicas)
+ .setIsr(replicas)
+ .setLeader(leader);
+ }
+
+ private static PartitionChangeRecord partitionChangeRecord(
+ Uuid id,
+ int partition,
+ OptionalInt leader,
+ Optional<List<Integer>> replicas
+ ) {
+ PartitionChangeRecord record = new PartitionChangeRecord();
+ leader.ifPresent(record::setLeader);
+ replicas.ifPresent(record::setReplicas);
+ replicas.ifPresent(record::setIsr);
+
+ return record
+ .setPartitionId(partition)
+ .setTopicId(id);
+ }
+}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
index ca13d90ddea..4a0155ed1bd 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
@@ -75,12 +75,12 @@ public final class MockControllerMetrics implements
ControllerMetrics {
}
@Override
- public void setGlobalTopicsCount(int topicCount) {
+ public void setGlobalTopicCount(int topicCount) {
this.topics = topicCount;
}
@Override
- public int globalTopicsCount() {
+ public int globalTopicCount() {
return this.topics;
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index 80c5c505ae0..8c6b1281e23 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -64,7 +64,6 @@ public class ProducerIdControlManagerTest {
setTime(time).
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(1000).
- setControllerMetrics(new MockControllerMetrics()).
setFeatureControlManager(featureControl).
build();
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 90807a2e2a7..9c427d17da1 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -277,7 +277,7 @@ public class QuorumControllerTest {
// Brokers are only registered and should still be fenced
allBrokers.forEach(brokerId -> {
- assertFalse(active.clusterControl().unfenced(brokerId),
+ assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
@@ -297,7 +297,7 @@ public class QuorumControllerTest {
TestUtils.waitForCondition(() -> {
sendBrokerheartbeat(active, brokersToKeepUnfenced,
brokerEpochs);
for (Integer brokerId : brokersToFence) {
- if (active.clusterControl().unfenced(brokerId)) {
+ if (active.clusterControl().isUnfenced(brokerId)) {
return false;
}
}
@@ -311,11 +311,11 @@ public class QuorumControllerTest {
// At this point only the brokers we want fenced should be fenced.
brokersToKeepUnfenced.forEach(brokerId -> {
- assertTrue(active.clusterControl().unfenced(brokerId),
+ assertTrue(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been unfenced");
});
brokersToFence.forEach(brokerId -> {
- assertFalse(active.clusterControl().unfenced(brokerId),
+ assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
@@ -376,7 +376,7 @@ public class QuorumControllerTest {
// Brokers are only registered and should still be fenced
allBrokers.forEach(brokerId -> {
- assertFalse(active.clusterControl().unfenced(brokerId),
+ assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
@@ -396,7 +396,7 @@ public class QuorumControllerTest {
() -> {
sendBrokerheartbeat(active, brokersToKeepUnfenced,
brokerEpochs);
for (Integer brokerId : brokersToFence) {
- if (active.clusterControl().unfenced(brokerId)) {
+ if (active.clusterControl().isUnfenced(brokerId)) {
return false;
}
}
@@ -411,11 +411,11 @@ public class QuorumControllerTest {
// At this point only the brokers we want fenced should be fenced.
brokersToKeepUnfenced.forEach(brokerId -> {
- assertTrue(active.clusterControl().unfenced(brokerId),
+ assertTrue(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been unfenced");
});
brokersToFence.forEach(brokerId -> {
- assertFalse(active.clusterControl().unfenced(brokerId),
+ assertFalse(active.clusterControl().isUnfenced(brokerId),
"Broker " + brokerId + " should have been fenced");
});
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 335fd52a07d..a9e25d57144 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -151,7 +151,6 @@ public class ReplicationControlManagerTest {
final LogContext logContext = new LogContext();
final MockTime time = new MockTime();
final MockRandom random = new MockRandom();
- final ControllerMetrics metrics = new MockControllerMetrics();
final FeatureControlManager featureControl = new
FeatureControlManager.Builder().
setSnapshotRegistry(snapshotRegistry).
setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(),
@@ -165,7 +164,6 @@ public class ReplicationControlManagerTest {
setSnapshotRegistry(snapshotRegistry).
setSessionTimeoutNs(TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS,
TimeUnit.NANOSECONDS)).
setReplicaPlacer(new StripedReplicaPlacer(random)).
- setControllerMetrics(metrics).
setFeatureControlManager(featureControl).
build();
final ConfigurationControlManager configurationControl = new
ConfigurationControlManager.Builder().
@@ -206,7 +204,6 @@ public class ReplicationControlManagerTest {
setMaxElectionsPerImbalance(Integer.MAX_VALUE).
setConfigurationControl(configurationControl).
setClusterControl(clusterControl).
- setControllerMetrics(metrics).
setCreateTopicPolicy(createTopicPolicy).
setFeatureControl(featureControl).
build();
@@ -605,41 +602,6 @@ public class ReplicationControlManagerTest {
);
}
- @Test
- public void testBrokerCountMetrics() throws Exception {
- ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
- ReplicationControlManager replicationControl = ctx.replicationControl;
-
- ctx.registerBrokers(0);
-
- assertEquals(1, ctx.metrics.fencedBrokerCount());
- assertEquals(0, ctx.metrics.activeBrokerCount());
-
- ctx.unfenceBrokers(0);
-
- assertEquals(0, ctx.metrics.fencedBrokerCount());
- assertEquals(1, ctx.metrics.activeBrokerCount());
-
- ctx.registerBrokers(1);
- ctx.unfenceBrokers(1);
-
- assertEquals(2, ctx.metrics.activeBrokerCount());
-
- ctx.registerBrokers(2);
- ctx.unfenceBrokers(2);
-
- assertEquals(0, ctx.metrics.fencedBrokerCount());
- assertEquals(3, ctx.metrics.activeBrokerCount());
-
- ControllerResult<Void> result = replicationControl.unregisterBroker(0);
- ctx.replay(result.records());
- result = replicationControl.unregisterBroker(2);
- ctx.replay(result.records());
-
- assertEquals(0, ctx.metrics.fencedBrokerCount());
- assertEquals(1, ctx.metrics.activeBrokerCount());
- }
-
@Test
public void testCreateTopicsWithValidateOnlyFlag() throws Exception {
ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
@@ -698,107 +660,6 @@ public class ReplicationControlManagerTest {
ctx.createTestTopic("quux", new int[][] {new int[] {1, 2, 0}},
POLICY_VIOLATION.code());
}
- @Test
- public void testGlobalTopicAndPartitionMetrics() throws Exception {
- ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
- ReplicationControlManager replicationControl = ctx.replicationControl;
- CreateTopicsRequestData request = new CreateTopicsRequestData();
- request.topics().add(new CreatableTopic().setName("foo").
- setNumPartitions(1).setReplicationFactor((short) -1));
-
- ctx.registerBrokers(0, 1, 2);
- ctx.unfenceBrokers(0, 1, 2);
-
- List<Uuid> topicsToDelete = new ArrayList<>();
-
- ControllerResult<CreateTopicsResponseData> result =
- replicationControl.createTopics(request,
Collections.singleton("foo"));
- topicsToDelete.add(result.response().topics().find("foo").topicId());
-
- RecordTestUtils.replayAll(replicationControl, result.records());
- assertEquals(1, ctx.metrics.globalTopicsCount());
-
- request = new CreateTopicsRequestData();
- request.topics().add(new CreatableTopic().setName("bar").
- setNumPartitions(1).setReplicationFactor((short) -1));
- request.topics().add(new CreatableTopic().setName("baz").
- setNumPartitions(2).setReplicationFactor((short) -1));
- result = replicationControl.createTopics(request,
- new HashSet<>(Arrays.asList("bar", "baz")));
- RecordTestUtils.replayAll(replicationControl, result.records());
- assertEquals(3, ctx.metrics.globalTopicsCount());
- assertEquals(4, ctx.metrics.globalPartitionCount());
-
- topicsToDelete.add(result.response().topics().find("baz").topicId());
- ControllerResult<Map<Uuid, ApiError>> deleteResult =
replicationControl.deleteTopics(topicsToDelete);
- RecordTestUtils.replayAll(replicationControl, deleteResult.records());
- assertEquals(1, ctx.metrics.globalTopicsCount());
- assertEquals(1, ctx.metrics.globalPartitionCount());
-
- Uuid topicToDelete = result.response().topics().find("bar").topicId();
- deleteResult =
replicationControl.deleteTopics(Collections.singletonList(topicToDelete));
- RecordTestUtils.replayAll(replicationControl, deleteResult.records());
- assertEquals(0, ctx.metrics.globalTopicsCount());
- assertEquals(0, ctx.metrics.globalPartitionCount());
- }
-
- @Test
- public void testOfflinePartitionAndReplicaImbalanceMetrics() throws
Exception {
- ReplicationControlTestContext ctx = new
ReplicationControlTestContext();
- ReplicationControlManager replicationControl = ctx.replicationControl;
- ctx.registerBrokers(0, 1, 2, 3);
- ctx.unfenceBrokers(0, 1, 2, 3);
-
- CreatableTopicResult foo = ctx.createTestTopic("foo", new int[][] {
- new int[] {0, 2}, new int[] {0, 1}});
-
- CreatableTopicResult zar = ctx.createTestTopic("zar", new int[][] {
- new int[] {0, 1, 2}, new int[] {1, 2, 3}, new int[] {1, 2, 0}});
-
- ControllerResult<Void> result = replicationControl.unregisterBroker(0);
- ctx.replay(result.records());
-
- // All partitions should still be online after unregistering broker 0
- assertEquals(0, ctx.metrics.offlinePartitionCount());
- // Three partitions should not have their preferred (first) replica 0
- assertEquals(3, ctx.metrics.preferredReplicaImbalanceCount());
-
- result = replicationControl.unregisterBroker(1);
- ctx.replay(result.records());
-
- // After unregistering broker 1, 1 partition for topic foo should go
offline
- assertEquals(1, ctx.metrics.offlinePartitionCount());
- // All five partitions should not have their preferred (first) replica
at this point
- assertEquals(5, ctx.metrics.preferredReplicaImbalanceCount());
-
- result = replicationControl.unregisterBroker(2);
- ctx.replay(result.records());
-
- // After unregistering broker 2, the last partition for topic foo
should go offline
- // and 2 partitions for topic zar should go offline
- assertEquals(4, ctx.metrics.offlinePartitionCount());
-
- result = replicationControl.unregisterBroker(3);
- ctx.replay(result.records());
-
- // After unregistering broker 3 the last partition for topic zar
should go offline
- assertEquals(5, ctx.metrics.offlinePartitionCount());
-
- // Deleting topic foo should bring the offline partition count down to
3
- ArrayList<ApiMessageAndVersion> records = new ArrayList<>();
- replicationControl.deleteTopic(foo.topicId(), records);
- ctx.replay(records);
-
- assertEquals(3, ctx.metrics.offlinePartitionCount());
-
- // Deleting topic zar should bring the offline partition count down to 0
- records = new ArrayList<>();
- replicationControl.deleteTopic(zar.topicId(), records);
- ctx.replay(records);
-
- assertEquals(0, ctx.metrics.offlinePartitionCount());
- }
-
@Test
public void testValidateNewTopicNames() {
Map<String, ApiError> topicErrors = new HashMap<>();
@@ -2072,7 +1933,6 @@ public class ReplicationControlManagerTest {
new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2,
1}}).topicId();
assertTrue(replication.arePartitionLeadersImbalanced());
- assertEquals(2, ctx.metrics.preferredReplicaImbalanceCount());
ctx.unfenceBrokers(1);
@@ -2104,7 +1964,6 @@ public class ReplicationControlManagerTest {
.setLeader(1);
assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord,
(short) 0)), balanceResult.records());
assertTrue(replication.arePartitionLeadersImbalanced());
- assertEquals(1, ctx.metrics.preferredReplicaImbalanceCount());
assertFalse(balanceResult.response());
ctx.unfenceBrokers(0);
@@ -2137,7 +1996,6 @@ public class ReplicationControlManagerTest {
.setLeader(0);
assertEquals(asList(new ApiMessageAndVersion(expectedChangeRecord,
(short) 0)), balanceResult.records());
assertFalse(replication.arePartitionLeadersImbalanced());
- assertEquals(0, ctx.metrics.preferredReplicaImbalanceCount());
assertFalse(balanceResult.response());
}