This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 06a5e258e49 KAFKA-18232: Add share group state topic prune metrics.
(#18174)
06a5e258e49 is described below
commit 06a5e258e498630c51ccf76d18b9c8081effd703
Author: Sushant Mahajan <[email protected]>
AuthorDate: Mon Jan 20 20:47:15 2025 +0530
KAFKA-18232: Add share group state topic prune metrics. (#18174)
Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
<[email protected]>
---
.../coordinator/share/ShareCoordinatorService.java | 4 ++
.../share/metrics/ShareCoordinatorMetrics.java | 31 +++++++++
.../share/ShareCoordinatorServiceTest.java | 81 ++++++++++++++++++++--
.../share/metrics/ShareCoordinatorMetricsTest.java | 50 ++++++++++++-
4 files changed, 157 insertions(+), 9 deletions(-)
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index a006edd7e64..05fcffb43c6 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -319,6 +319,10 @@ public class ShareCoordinatorService implements
ShareCoordinator {
fut.completeExceptionally(exp);
return;
}
+ shareCoordinatorMetrics.recordPrune(
+ off,
+ tp
+ );
fut.complete(null);
// Best effort prevention of issuing duplicate delete
calls.
lastPrunedOffsets.put(tp, off);
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java
index 9076fdcbb8f..197b1d76e0b 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard;
@@ -46,6 +47,8 @@ public class ShareCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
public static final String SHARE_COORDINATOR_WRITE_SENSOR_NAME =
"ShareCoordinatorWrite";
public static final String SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME =
"ShareCoordinatorWriteLatency";
+ public static final String SHARE_COORDINATOR_STATE_TOPIC_PRUNE_SENSOR_NAME
= "ShareCoordinatorStateTopicPruneSensorName";
+ private Map<TopicPartition, ShareGroupPruneMetrics> pruneMetrics = new
ConcurrentHashMap<>();
/**
* Global sensors. These are shared across all metrics shards.
@@ -92,6 +95,7 @@ public class ShareCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
SHARE_COORDINATOR_WRITE_SENSOR_NAME,
SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME
).forEach(metrics::removeSensor);
+ pruneMetrics.values().forEach(v ->
metrics.removeSensor(v.pruneSensor.name()));
}
@Override
@@ -153,4 +157,31 @@ public class ShareCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
globalSensors.get(sensorName).record();
}
}
+
+ public void recordPrune(double value, TopicPartition tp) {
+ pruneMetrics.computeIfAbsent(tp, k -> new ShareGroupPruneMetrics(tp))
+ .pruneSensor.record(value);
+ }
+
+ private class ShareGroupPruneMetrics {
+ private final Sensor pruneSensor;
+
+ ShareGroupPruneMetrics(TopicPartition tp) {
+ String sensorNameSuffix = tp.toString();
+ Map<String, String> tags = Map.of(
+ "topic", tp.topic(),
+ "partition", Integer.toString(tp.partition())
+ );
+
+ pruneSensor =
metrics.sensor(SHARE_COORDINATOR_STATE_TOPIC_PRUNE_SENSOR_NAME +
sensorNameSuffix);
+
+ pruneSensor.add(
+ metrics.metricName("last-pruned-offset",
+ METRICS_GROUP,
+ "The offset at which the share-group state topic was last
pruned.",
+ tags),
+ new Value()
+ );
+ }
+ }
}
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index e7ace5fd2b9..7f7776cd93e 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -52,6 +52,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -971,11 +972,13 @@ class ShareCoordinatorServiceTest {
CompletableFuture.completedFuture(Optional.of(11L))
);
+ Metrics metrics = new Metrics();
+
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
- new ShareCoordinatorMetrics(),
+ new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
@@ -1007,6 +1010,10 @@ class ShareCoordinatorServiceTest {
verify(writer, times(2))
.deleteRecords(any(), anyLong());
+
+ checkMetrics(metrics);
+ checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true);
+
service.shutdown();
}
@@ -1058,11 +1065,13 @@ class ShareCoordinatorServiceTest {
CompletableFuture.completedFuture(Optional.of(21L))
);
+ Metrics metrics = new Metrics();
+
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
- new ShareCoordinatorMetrics(),
+ new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
@@ -1094,6 +1103,11 @@ class ShareCoordinatorServiceTest {
verify(writer, times(4))
.deleteRecords(any(), anyLong());
+
+ checkMetrics(metrics);
+ checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true);
+ checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1,
false);
+
service.shutdown();
}
@@ -1111,11 +1125,13 @@ class ShareCoordinatorServiceTest {
any()
)).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
+ Metrics metrics = new Metrics();
+
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
- new ShareCoordinatorMetrics(),
+ new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
@@ -1139,6 +1155,10 @@ class ShareCoordinatorServiceTest {
verify(writer, times(0))
.deleteRecords(any(), anyLong());
+
+ checkMetrics(metrics);
+ checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0,
false);
+
service.shutdown();
}
@@ -1156,11 +1176,13 @@ class ShareCoordinatorServiceTest {
any()
)).thenReturn(CompletableFuture.completedFuture(Optional.of(20L)));
+ Metrics metrics = new Metrics();
+
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
- new ShareCoordinatorMetrics(),
+ new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
@@ -1184,6 +1206,9 @@ class ShareCoordinatorServiceTest {
verify(writer, times(1))
.deleteRecords(any(), eq(20L));
+
+ checkMetrics(metrics);
+
service.shutdown();
}
@@ -1201,11 +1226,12 @@ class ShareCoordinatorServiceTest {
any()
)).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
+ Metrics metrics = new Metrics();
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
- new ShareCoordinatorMetrics(),
+ new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
@@ -1229,6 +1255,10 @@ class ShareCoordinatorServiceTest {
verify(writer, times(0))
.deleteRecords(any(), anyLong());
+
+ checkMetrics(metrics);
+ checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0,
false);
+
service.shutdown();
}
@@ -1257,11 +1287,12 @@ class ShareCoordinatorServiceTest {
CompletableFuture.completedFuture(Optional.of(10L))
);
+ Metrics metrics = new Metrics();
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
- new ShareCoordinatorMetrics(),
+ new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
@@ -1293,6 +1324,10 @@ class ShareCoordinatorServiceTest {
verify(writer, times(1))
.deleteRecords(any(), anyLong());
+
+ checkMetrics(metrics);
+ checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true);
+
service.shutdown();
}
@@ -1325,11 +1360,13 @@ class ShareCoordinatorServiceTest {
CompletableFuture.completedFuture(Optional.of(10L))
);
+ Metrics metrics = new Metrics();
+
ShareCoordinatorService service = spy(new ShareCoordinatorService(
new LogContext(),
ShareCoordinatorTestConfig.testConfig(),
runtime,
- new ShareCoordinatorMetrics(),
+ new ShareCoordinatorMetrics(metrics),
time,
timer,
writer
@@ -1361,6 +1398,36 @@ class ShareCoordinatorServiceTest {
verify(writer, times(2))
.deleteRecords(any(), anyLong());
+
+ checkMetrics(metrics);
+ checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true);
+
service.shutdown();
}
+
+ private void checkMetrics(Metrics metrics) {
+ Set<MetricName> usualMetrics = new HashSet<>(Arrays.asList(
+ metrics.metricName("write-latency-avg",
ShareCoordinatorMetrics.METRICS_GROUP),
+ metrics.metricName("write-latency-max",
ShareCoordinatorMetrics.METRICS_GROUP),
+ metrics.metricName("write-rate",
ShareCoordinatorMetrics.METRICS_GROUP),
+ metrics.metricName("write-total",
ShareCoordinatorMetrics.METRICS_GROUP)
+ ));
+
+ usualMetrics.forEach(metric ->
assertTrue(metrics.metrics().containsKey(metric)));
+ }
+
+ private void checkPruneMetric(Metrics metrics, String topic, int
partition, boolean checkPresence) {
+ boolean isPresent = metrics.metrics().containsKey(
+ metrics.metricName(
+ "last-pruned-offset",
+ ShareCoordinatorMetrics.METRICS_GROUP,
+ "The offset at which the share-group state topic was last
pruned.",
+ Map.of(
+ "topic", topic,
+ "partition", Integer.toString(partition)
+ )
+ )
+ );
+ assertEquals(checkPresence, isPresent);
+ }
}
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetricsTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetricsTest.java
index 80b2889ef93..39ca697f069 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetricsTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetricsTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.share.metrics;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
@@ -27,10 +28,12 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.Map;
import static
org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME;
import static
org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ShareCoordinatorMetricsTest {
@@ -46,14 +49,21 @@ public class ShareCoordinatorMetricsTest {
metrics.metricName("write-latency-max",
ShareCoordinatorMetrics.METRICS_GROUP)
));
- ShareCoordinatorMetrics ignored = new ShareCoordinatorMetrics(metrics);
+ ShareCoordinatorMetrics coordMetrics = new
ShareCoordinatorMetrics(metrics);
for (MetricName metricName : expectedMetrics) {
assertTrue(metrics.metrics().containsKey(metricName));
}
+
+ assertFalse(metrics.metrics().containsKey(pruneMetricName(metrics,
Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)));
+ coordMetrics.recordPrune(
+ 10.0,
+ new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)
+ );
+ assertTrue(metrics.metrics().containsKey(pruneMetricName(metrics,
Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)));
}
@Test
- public void testGlobalSensors() {
+ public void testShardGlobalSensors() {
MockTime time = new MockTime();
Metrics metrics = new Metrics(time);
ShareCoordinatorMetrics coordinatorMetrics = new
ShareCoordinatorMetrics(metrics);
@@ -71,7 +81,43 @@ public class ShareCoordinatorMetricsTest {
assertMetricValue(metrics, metrics.metricName("write-latency-max",
ShareCoordinatorMetrics.METRICS_GROUP), 30.0);
}
+ @Test
+ public void testCoordinatorGlobalSensors() {
+ MockTime time = new MockTime();
+ Metrics metrics = new Metrics(time);
+ ShareCoordinatorMetrics coordinatorMetrics = new
ShareCoordinatorMetrics(metrics);
+
+ coordinatorMetrics.record(SHARE_COORDINATOR_WRITE_SENSOR_NAME);
+ assertMetricValue(metrics, metrics.metricName("write-rate",
ShareCoordinatorMetrics.METRICS_GROUP), 1.0 / 30); //sampled stats
+ assertMetricValue(metrics, metrics.metricName("write-total",
ShareCoordinatorMetrics.METRICS_GROUP), 1.0);
+
+ coordinatorMetrics.record(SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME,
20);
+ coordinatorMetrics.record(SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME,
30);
+ assertMetricValue(metrics, metrics.metricName("write-latency-avg",
ShareCoordinatorMetrics.METRICS_GROUP), 50.0 / 2);
+ assertMetricValue(metrics, metrics.metricName("write-latency-max",
ShareCoordinatorMetrics.METRICS_GROUP), 30.0);
+
+
+ assertFalse(metrics.metrics().containsKey(pruneMetricName(metrics,
Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)));
+ coordinatorMetrics.recordPrune(
+ 10.0,
+ new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)
+ );
+ assertMetricValue(metrics, pruneMetricName(metrics,
Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1), 10.0);
+ }
+
private void assertMetricValue(Metrics metrics, MetricName metricName,
double val) {
assertEquals(val, metrics.metric(metricName).metricValue());
}
+
+ private MetricName pruneMetricName(Metrics metrics, String topic, Integer
partition) {
+ return metrics.metricName(
+ "last-pruned-offset",
+ ShareCoordinatorMetrics.METRICS_GROUP,
+ "The offset at which the share-group state topic was last pruned.",
+ Map.of(
+ "topic", topic,
+ "partition", Integer.toString(partition)
+ )
+ );
+ }
}