This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 06a5e258e49 KAFKA-18232: Add share group state topic prune metrics. 
(#18174)
06a5e258e49 is described below

commit 06a5e258e498630c51ccf76d18b9c8081effd703
Author: Sushant Mahajan <[email protected]>
AuthorDate: Mon Jan 20 20:47:15 2025 +0530

    KAFKA-18232: Add share group state topic prune metrics. (#18174)
    
    Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield 
<[email protected]>
---
 .../coordinator/share/ShareCoordinatorService.java |  4 ++
 .../share/metrics/ShareCoordinatorMetrics.java     | 31 +++++++++
 .../share/ShareCoordinatorServiceTest.java         | 81 ++++++++++++++++++++--
 .../share/metrics/ShareCoordinatorMetricsTest.java | 50 ++++++++++++-
 4 files changed, 157 insertions(+), 9 deletions(-)

diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
index a006edd7e64..05fcffb43c6 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java
@@ -319,6 +319,10 @@ public class ShareCoordinatorService implements 
ShareCoordinator {
                             fut.completeExceptionally(exp);
                             return;
                         }
+                        shareCoordinatorMetrics.recordPrune(
+                            off,
+                            tp
+                        );
                         fut.complete(null);
                         // Best effort prevention of issuing duplicate delete 
calls.
                         lastPrunedOffsets.put(tp, off);
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java
index 9076fdcbb8f..197b1d76e0b 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetrics.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics;
 import org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard;
@@ -46,6 +47,8 @@ public class ShareCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
 
     public static final String SHARE_COORDINATOR_WRITE_SENSOR_NAME = 
"ShareCoordinatorWrite";
     public static final String SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME = 
"ShareCoordinatorWriteLatency";
+    public static final String SHARE_COORDINATOR_STATE_TOPIC_PRUNE_SENSOR_NAME 
= "ShareCoordinatorStateTopicPruneSensorName";
+    private Map<TopicPartition, ShareGroupPruneMetrics> pruneMetrics = new 
ConcurrentHashMap<>();
 
     /**
      * Global sensors. These are shared across all metrics shards.
@@ -92,6 +95,7 @@ public class ShareCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
             SHARE_COORDINATOR_WRITE_SENSOR_NAME,
             SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME
         ).forEach(metrics::removeSensor);
+        pruneMetrics.values().forEach(v -> 
metrics.removeSensor(v.pruneSensor.name()));
     }
 
     @Override
@@ -153,4 +157,31 @@ public class ShareCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
             globalSensors.get(sensorName).record();
         }
     }
+
+    public void recordPrune(double value, TopicPartition tp) {
+        pruneMetrics.computeIfAbsent(tp, k -> new ShareGroupPruneMetrics(tp))
+            .pruneSensor.record(value);
+    }
+
+    private class ShareGroupPruneMetrics {
+        private final Sensor pruneSensor;
+
+        ShareGroupPruneMetrics(TopicPartition tp) {
+            String sensorNameSuffix = tp.toString();
+            Map<String, String> tags = Map.of(
+                "topic", tp.topic(),
+                "partition", Integer.toString(tp.partition())
+            );
+
+            pruneSensor = 
metrics.sensor(SHARE_COORDINATOR_STATE_TOPIC_PRUNE_SENSOR_NAME + 
sensorNameSuffix);
+
+            pruneSensor.add(
+                metrics.metricName("last-pruned-offset",
+                    METRICS_GROUP,
+                    "The offset at which the share-group state topic was last 
pruned.",
+                    tags),
+                new Value()
+            );
+        }
+    }
 }
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
index e7ace5fd2b9..7f7776cd93e 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorServiceTest.java
@@ -52,6 +52,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -971,11 +972,13 @@ class ShareCoordinatorServiceTest {
             CompletableFuture.completedFuture(Optional.of(11L))
         );
 
+        Metrics metrics = new Metrics();
+
         ShareCoordinatorService service = spy(new ShareCoordinatorService(
             new LogContext(),
             ShareCoordinatorTestConfig.testConfig(),
             runtime,
-            new ShareCoordinatorMetrics(),
+            new ShareCoordinatorMetrics(metrics),
             time,
             timer,
             writer
@@ -1007,6 +1010,10 @@ class ShareCoordinatorServiceTest {
 
         verify(writer, times(2))
             .deleteRecords(any(), anyLong());
+
+        checkMetrics(metrics);
+        checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true);
+
         service.shutdown();
     }
 
@@ -1058,11 +1065,13 @@ class ShareCoordinatorServiceTest {
             CompletableFuture.completedFuture(Optional.of(21L))
         );
 
+        Metrics metrics = new Metrics();
+
         ShareCoordinatorService service = spy(new ShareCoordinatorService(
             new LogContext(),
             ShareCoordinatorTestConfig.testConfig(),
             runtime,
-            new ShareCoordinatorMetrics(),
+            new ShareCoordinatorMetrics(metrics),
             time,
             timer,
             writer
@@ -1094,6 +1103,11 @@ class ShareCoordinatorServiceTest {
 
         verify(writer, times(4))
             .deleteRecords(any(), anyLong());
+
+        checkMetrics(metrics);
+        checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true);
+        checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1, 
false);
+
         service.shutdown();
     }
 
@@ -1111,11 +1125,13 @@ class ShareCoordinatorServiceTest {
             any()
         
)).thenReturn(CompletableFuture.failedFuture(Errors.UNKNOWN_SERVER_ERROR.exception()));
 
+        Metrics metrics = new Metrics();
+
         ShareCoordinatorService service = spy(new ShareCoordinatorService(
             new LogContext(),
             ShareCoordinatorTestConfig.testConfig(),
             runtime,
-            new ShareCoordinatorMetrics(),
+            new ShareCoordinatorMetrics(metrics),
             time,
             timer,
             writer
@@ -1139,6 +1155,10 @@ class ShareCoordinatorServiceTest {
 
         verify(writer, times(0))
             .deleteRecords(any(), anyLong());
+
+        checkMetrics(metrics);
+        checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, 
false);
+
         service.shutdown();
     }
 
@@ -1156,11 +1176,13 @@ class ShareCoordinatorServiceTest {
             any()
         )).thenReturn(CompletableFuture.completedFuture(Optional.of(20L)));
 
+        Metrics metrics = new Metrics();
+
         ShareCoordinatorService service = spy(new ShareCoordinatorService(
             new LogContext(),
             ShareCoordinatorTestConfig.testConfig(),
             runtime,
-            new ShareCoordinatorMetrics(),
+            new ShareCoordinatorMetrics(metrics),
             time,
             timer,
             writer
@@ -1184,6 +1206,9 @@ class ShareCoordinatorServiceTest {
 
         verify(writer, times(1))
             .deleteRecords(any(), eq(20L));
+
+        checkMetrics(metrics);
+
         service.shutdown();
     }
 
@@ -1201,11 +1226,12 @@ class ShareCoordinatorServiceTest {
             any()
         )).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
 
+        Metrics metrics = new Metrics();
         ShareCoordinatorService service = spy(new ShareCoordinatorService(
             new LogContext(),
             ShareCoordinatorTestConfig.testConfig(),
             runtime,
-            new ShareCoordinatorMetrics(),
+            new ShareCoordinatorMetrics(metrics),
             time,
             timer,
             writer
@@ -1229,6 +1255,10 @@ class ShareCoordinatorServiceTest {
 
         verify(writer, times(0))
             .deleteRecords(any(), anyLong());
+
+        checkMetrics(metrics);
+        checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, 
false);
+
         service.shutdown();
     }
 
@@ -1257,11 +1287,12 @@ class ShareCoordinatorServiceTest {
             CompletableFuture.completedFuture(Optional.of(10L))
         );
 
+        Metrics metrics = new Metrics();
         ShareCoordinatorService service = spy(new ShareCoordinatorService(
             new LogContext(),
             ShareCoordinatorTestConfig.testConfig(),
             runtime,
-            new ShareCoordinatorMetrics(),
+            new ShareCoordinatorMetrics(metrics),
             time,
             timer,
             writer
@@ -1293,6 +1324,10 @@ class ShareCoordinatorServiceTest {
 
         verify(writer, times(1))
             .deleteRecords(any(), anyLong());
+
+        checkMetrics(metrics);
+        checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true);
+
         service.shutdown();
     }
 
@@ -1325,11 +1360,13 @@ class ShareCoordinatorServiceTest {
             CompletableFuture.completedFuture(Optional.of(10L))
         );
 
+        Metrics metrics = new Metrics();
+
         ShareCoordinatorService service = spy(new ShareCoordinatorService(
             new LogContext(),
             ShareCoordinatorTestConfig.testConfig(),
             runtime,
-            new ShareCoordinatorMetrics(),
+            new ShareCoordinatorMetrics(metrics),
             time,
             timer,
             writer
@@ -1361,6 +1398,36 @@ class ShareCoordinatorServiceTest {
 
         verify(writer, times(2))
             .deleteRecords(any(), anyLong());
+
+        checkMetrics(metrics);
+        checkPruneMetric(metrics, Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, true);
+
         service.shutdown();
     }
+
+    private void checkMetrics(Metrics metrics) {
+        Set<MetricName> usualMetrics = new HashSet<>(Arrays.asList(
+            metrics.metricName("write-latency-avg", 
ShareCoordinatorMetrics.METRICS_GROUP),
+            metrics.metricName("write-latency-max", 
ShareCoordinatorMetrics.METRICS_GROUP),
+            metrics.metricName("write-rate", 
ShareCoordinatorMetrics.METRICS_GROUP),
+            metrics.metricName("write-total", 
ShareCoordinatorMetrics.METRICS_GROUP)
+        ));
+
+        usualMetrics.forEach(metric -> 
assertTrue(metrics.metrics().containsKey(metric)));
+    }
+
+    private void checkPruneMetric(Metrics metrics, String topic, int 
partition, boolean checkPresence) {
+        boolean isPresent = metrics.metrics().containsKey(
+            metrics.metricName(
+                "last-pruned-offset",
+                ShareCoordinatorMetrics.METRICS_GROUP,
+                "The offset at which the share-group state topic was last 
pruned.",
+                Map.of(
+                    "topic", topic,
+                    "partition", Integer.toString(partition)
+                )
+            )
+        );
+        assertEquals(checkPresence, isPresent);
+    }
 }
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetricsTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetricsTest.java
index 80b2889ef93..39ca697f069 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetricsTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/metrics/ShareCoordinatorMetricsTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.share.metrics;
 
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
@@ -27,10 +28,12 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Map;
 
 import static 
org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ShareCoordinatorMetricsTest {
@@ -46,14 +49,21 @@ public class ShareCoordinatorMetricsTest {
             metrics.metricName("write-latency-max", 
ShareCoordinatorMetrics.METRICS_GROUP)
         ));
 
-        ShareCoordinatorMetrics ignored = new ShareCoordinatorMetrics(metrics);
+        ShareCoordinatorMetrics coordMetrics = new 
ShareCoordinatorMetrics(metrics);
         for (MetricName metricName : expectedMetrics) {
             assertTrue(metrics.metrics().containsKey(metricName));
         }
+
+        assertFalse(metrics.metrics().containsKey(pruneMetricName(metrics, 
Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)));
+        coordMetrics.recordPrune(
+            10.0,
+            new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)
+        );
+        assertTrue(metrics.metrics().containsKey(pruneMetricName(metrics, 
Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)));
     }
 
     @Test
-    public void testGlobalSensors() {
+    public void testShardGlobalSensors() {
         MockTime time = new MockTime();
         Metrics metrics = new Metrics(time);
         ShareCoordinatorMetrics coordinatorMetrics = new 
ShareCoordinatorMetrics(metrics);
@@ -71,7 +81,43 @@ public class ShareCoordinatorMetricsTest {
         assertMetricValue(metrics, metrics.metricName("write-latency-max", 
ShareCoordinatorMetrics.METRICS_GROUP), 30.0);
     }
 
+    @Test
+    public void testCoordinatorGlobalSensors() {
+        MockTime time = new MockTime();
+        Metrics metrics = new Metrics(time);
+        ShareCoordinatorMetrics coordinatorMetrics = new 
ShareCoordinatorMetrics(metrics);
+
+        coordinatorMetrics.record(SHARE_COORDINATOR_WRITE_SENSOR_NAME);
+        assertMetricValue(metrics, metrics.metricName("write-rate", 
ShareCoordinatorMetrics.METRICS_GROUP), 1.0 / 30);  //sampled stats
+        assertMetricValue(metrics, metrics.metricName("write-total", 
ShareCoordinatorMetrics.METRICS_GROUP), 1.0);
+
+        coordinatorMetrics.record(SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME, 
20);
+        coordinatorMetrics.record(SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME, 
30);
+        assertMetricValue(metrics, metrics.metricName("write-latency-avg", 
ShareCoordinatorMetrics.METRICS_GROUP), 50.0 / 2);
+        assertMetricValue(metrics, metrics.metricName("write-latency-max", 
ShareCoordinatorMetrics.METRICS_GROUP), 30.0);
+
+
+        assertFalse(metrics.metrics().containsKey(pruneMetricName(metrics, 
Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)));
+        coordinatorMetrics.recordPrune(
+            10.0,
+            new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1)
+        );
+        assertMetricValue(metrics, pruneMetricName(metrics, 
Topic.SHARE_GROUP_STATE_TOPIC_NAME, 1), 10.0);
+    }
+
     private void assertMetricValue(Metrics metrics, MetricName metricName, 
double val) {
         assertEquals(val, metrics.metric(metricName).metricValue());
     }
+
+    private MetricName pruneMetricName(Metrics metrics, String topic, Integer 
partition) {
+        return metrics.metricName(
+            "last-pruned-offset",
+            ShareCoordinatorMetrics.METRICS_GROUP,
+            "The offset at which the share-group state topic was last pruned.",
+            Map.of(
+                "topic", topic,
+                "partition", Integer.toString(partition)
+            )
+        );
+    }
 }

Reply via email to