This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1c582a4a357 KAFKA-18954: Add ELR election rate metric (#19180)
1c582a4a357 is described below
commit 1c582a4a357470f9f828a287a4bf9bb43cde82cc
Author: Calvin Liu <[email protected]>
AuthorDate: Thu Mar 20 15:37:49 2025 -0700
KAFKA-18954: Add ELR election rate metric (#19180)
Add a metric to track the number of election is done using ELR.
https://issues.apache.org/jira/browse/KAFKA-18954
Reviewers: Colin P. McCabe <[email protected]>, Justine Olshan
<[email protected]>
---
docs/ops.html | 5 +++++
.../controller/metrics/ControllerMetadataMetrics.java | 10 ++++++++++
.../controller/metrics/ControllerMetricsChanges.java | 8 ++++++++
.../apache/kafka/metadata/PartitionRegistration.java | 4 ++++
.../metrics/ControllerMetadataMetricsTest.java | 19 ++++++++++++++++++-
.../kafka/metadata/PartitionRegistrationTest.java | 6 ++++++
6 files changed, 51 insertions(+), 1 deletion(-)
diff --git a/docs/ops.html b/docs/ops.html
index cf662735888..4d6d60c6a4d 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1528,6 +1528,11 @@ NodeId DirectoryId LogEndOffset Lag
LastFetchTimestamp LastCaughtUpTi
<td>kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec</td>
<td>0</td>
</tr>
+ <tr>
+ <td>Election from Eligible leader replicas rate</td>
+
<td>kafka.controller:type=ControllerStats,name=ElectionFromEligibleLeaderReplicasPerSec</td>
+ <td>0</td>
+ </tr>
<tr>
<td>Is controller active on broker</td>
<td>kafka.controller:type=KafkaController,name=ActiveControllerCount</td>
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
index 938bf1ed48a..46854206331 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
@@ -55,6 +55,8 @@ public final class ControllerMetadataMetrics implements
AutoCloseable {
"KafkaController", "MetadataErrorCount");
private static final MetricName UNCLEAN_LEADER_ELECTIONS_PER_SEC =
getMetricName(
"ControllerStats", "UncleanLeaderElectionsPerSec");
+ private static final MetricName
ELECTION_FROM_ELIGIBLE_LEADER_REPLICAS_PER_SEC = getMetricName(
+ "ControllerStats", "ElectionFromEligibleLeaderReplicasPerSec");
private static final MetricName IGNORED_STATIC_VOTERS = getMetricName(
"KafkaController", "IgnoredStaticVoters");
@@ -67,6 +69,7 @@ public final class ControllerMetadataMetrics implements
AutoCloseable {
private final AtomicInteger preferredReplicaImbalanceCount = new
AtomicInteger(0);
private final AtomicInteger metadataErrorCount = new AtomicInteger(0);
private Optional<Meter> uncleanLeaderElectionMeter = Optional.empty();
+ private Optional<Meter> electionFromEligibleLeaderReplicasMeter =
Optional.empty();
private final AtomicBoolean ignoredStaticVoters = new AtomicBoolean(false);
/**
@@ -120,6 +123,8 @@ public final class ControllerMetadataMetrics implements
AutoCloseable {
}));
registry.ifPresent(r -> uncleanLeaderElectionMeter =
Optional.of(registry.get().newMeter(UNCLEAN_LEADER_ELECTIONS_PER_SEC,
"elections", TimeUnit.SECONDS)));
+ registry.ifPresent(r -> electionFromEligibleLeaderReplicasMeter =
+
Optional.of(registry.get().newMeter(ELECTION_FROM_ELIGIBLE_LEADER_REPLICAS_PER_SEC,
"elections", TimeUnit.SECONDS)));
registry.ifPresent(r -> r.newGauge(IGNORED_STATIC_VOTERS, new
Gauge<Integer>() {
@Override
@@ -213,6 +218,10 @@ public final class ControllerMetadataMetrics implements
AutoCloseable {
this.uncleanLeaderElectionMeter.ifPresent(m -> m.mark(count));
}
+ public void updateElectionFromEligibleLeaderReplicasCount(int count) {
+ this.electionFromEligibleLeaderReplicasMeter.ifPresent(m ->
m.mark(count));
+ }
+
public void setIgnoredStaticVoters(boolean ignored) {
ignoredStaticVoters.set(ignored);
}
@@ -232,6 +241,7 @@ public final class ControllerMetadataMetrics implements
AutoCloseable {
PREFERRED_REPLICA_IMBALANCE_COUNT,
METADATA_ERROR_COUNT,
UNCLEAN_LEADER_ELECTIONS_PER_SEC,
+ ELECTION_FROM_ELIGIBLE_LEADER_REPLICAS_PER_SEC,
IGNORED_STATIC_VOTERS
).forEach(r::removeMetric));
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java
index 7a4fef9182e..843f779826e 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetricsChanges.java
@@ -48,6 +48,7 @@ class ControllerMetricsChanges {
private int offlinePartitionsChange = 0;
private int partitionsWithoutPreferredLeaderChange = 0;
private int uncleanLeaderElection = 0;
+ private int electionFromElrCounter = 0;
public int fencedBrokersChange() {
return fencedBrokersChange;
@@ -132,6 +133,9 @@ class ControllerMetricsChanges {
if (!PartitionRegistration.electionWasClean(next.leader, prevIsr,
prevElr)) {
uncleanLeaderElection++;
}
+ if (PartitionRegistration.electionFromElr(next.leader, prevElr)) {
+ electionFromElrCounter++;
+ }
}
globalPartitionsChange += delta(wasPresent, isPresent);
offlinePartitionsChange += delta(wasOffline, isOffline);
@@ -164,5 +168,9 @@ class ControllerMetricsChanges {
metrics.updateUncleanLeaderElection(uncleanLeaderElection);
uncleanLeaderElection = 0;
}
+ if (electionFromElrCounter > 0) {
+
metrics.updateElectionFromEligibleLeaderReplicasCount(electionFromElrCounter);
+ electionFromElrCounter = 0;
+ }
}
}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
index 3891b624226..808c9809352 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java
@@ -169,6 +169,10 @@ public class PartitionRegistration {
return newLeader == NO_LEADER || Replicas.contains(isr, newLeader) ||
Replicas.contains(elr, newLeader);
}
+ public static boolean electionFromElr(int newLeader, int[] elr) {
+ return Replicas.contains(elr, newLeader);
+ }
+
private static List<Uuid> checkDirectories(PartitionRecord record) {
if (record.directories() != null && !record.directories().isEmpty() &&
record.replicas().size() != record.directories().size()) {
throw new InvalidReplicaDirectoriesException(record);
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
index 42801c510f1..dde8be43c9a 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/metrics/ControllerMetadataMetricsTest.java
@@ -49,7 +49,8 @@ public class ControllerMetadataMetricsTest {
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount",
"kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount",
"kafka.controller:type=KafkaController,name=IgnoredStaticVoters",
-
"kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec"
+
"kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec",
+
"kafka.controller:type=ControllerStats,name=ElectionFromEligibleLeaderReplicasPerSec"
)));
}
ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry,
"KafkaController",
@@ -192,6 +193,22 @@ public class ControllerMetadataMetricsTest {
}
}
+ @SuppressWarnings("LocalVariableName")
+ @Test
+ public void testUpdateElectionFromEligibleLeaderReplicasCount() {
+ MetricsRegistry registry = new MetricsRegistry();
+ try (ControllerMetadataMetrics metrics = new
ControllerMetadataMetrics(Optional.of(registry))) {
+ Meter ElectionFromEligibleLeaderReplicasPerSec = (Meter) registry
+ .allMetrics()
+ .get(metricName("ControllerStats",
"ElectionFromEligibleLeaderReplicasPerSec"));
+ assertEquals(0, ElectionFromEligibleLeaderReplicasPerSec.count());
+ metrics.updateElectionFromEligibleLeaderReplicasCount(2);
+ assertEquals(2, ElectionFromEligibleLeaderReplicasPerSec.count());
+ } finally {
+ registry.shutdown();
+ }
+ }
+
@Test
public void testIgnoredStaticVoters() {
MetricsRegistry registry = new MetricsRegistry();
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
index 585dc842522..c0de7d2125d 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java
@@ -63,6 +63,12 @@ public class PartitionRegistrationTest {
assertTrue(PartitionRegistration.electionWasClean(3, new int[]{}, new
int[]{1, 2, 3}));
}
+ @Test
+ public void testEligibleLeaderReplicasElection() {
+ assertTrue(PartitionRegistration.electionFromElr(1, new int[]{1, 2}));
+ assertFalse(PartitionRegistration.electionFromElr(1, new int[]{0, 2}));
+ }
+
@Test
public void testPartitionControlInfoMergeAndDiff() {
PartitionRegistration a = new PartitionRegistration.Builder().