This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 815dd93e2f9 MINOR: Invoke share group rebalance sensor. (#20006)
815dd93e2f9 is described below

commit 815dd93e2f9936bb9a2b738b67b94e270ab0e48a
Author: Sushant Mahajan <smaha...@confluent.io>
AuthorDate: Sat Jun 21 13:05:19 2025 +0530

    MINOR: Invoke share group rebalance sensor. (#20006)
    
    * The share group rebalance metric was not being invoked at the
    appropriate group id bump position.
    * This PR solves the issue.
    * The metric name has been updated
    (s/rebalance-rate/share-group-rebalance-rate,
    s/rebalance-count/share-group-rebalance-count/)
    * Updated tests in `GroupMetadataManagerTest` and
    `GroupCoordinatorMetricsTest`
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>
---
 .../coordinator/group/GroupMetadataManager.java      |  2 ++
 .../group/metrics/GroupCoordinatorMetrics.java       | 10 ++++------
 .../coordinator/group/GroupMetadataManagerTest.java  |  5 +++++
 .../group/metrics/GroupCoordinatorMetricsTest.java   | 20 ++++++--------------
 4 files changed, 17 insertions(+), 20 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 20b9337fb41..9c9500485b1 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -248,6 +248,7 @@ import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPA
 import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
+import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;
 import static 
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord;
@@ -2610,6 +2611,7 @@ public class GroupMetadataManager {
                 groupEpoch += 1;
                 records.add(newShareGroupEpochRecord(groupId, groupEpoch, 
groupMetadataHash));
                 log.info("[GroupId {}] Bumped group epoch to {} with metadata 
hash {}.", groupId, groupEpoch, groupMetadataHash);
+                metrics.record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
             }
 
             group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
index dd21570b654..9d05727d2e9 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
@@ -297,14 +297,12 @@ public class GroupCoordinatorMetrics extends 
CoordinatorMetrics implements AutoC
 
         Sensor shareGroupRebalanceSensor = 
metrics.sensor(SHARE_GROUP_REBALANCES_SENSOR_NAME);
         shareGroupRebalanceSensor.add(new Meter(
-            metrics.metricName("rebalance-rate",
+            metrics.metricName("share-group-rebalance-rate",
                 METRICS_GROUP,
-                "The rate of share group rebalances",
-                SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString()),
-            metrics.metricName("rebalance-count",
+                "The rate of share group rebalances"),
+            metrics.metricName("share-group-rebalance-count",
                 METRICS_GROUP,
-                "The total number of share group rebalances",
-                SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString())));
+                "The total number of share group rebalances")));
         
         Sensor streamsGroupRebalanceSensor = 
metrics.sensor(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
         streamsGroupRebalanceSensor.add(new Meter(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 3f47b8e70d8..1cf13c70490 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -203,6 +203,7 @@ import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPA
 import static 
org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
 import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
+import static 
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -19699,6 +19700,7 @@ public class GroupMetadataManagerTest {
                     .setMemberId(Uuid.randomUuid().toString())
                     .setMemberEpoch(1)
                     .setSubscribedTopicNames(List.of("foo", "bar"))));
+        verify(context.metrics, 
times(0)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
     }
 
     @Test
@@ -19785,6 +19787,7 @@ public class GroupMetadataManagerTest {
                 .setGroupId(groupId)
                 .setMemberEpoch(0)
                 .setSubscribedTopicNames(List.of("foo", "bar"))));
+        verify(context.metrics, 
times(0)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
     }
 
     @Test
@@ -22306,6 +22309,7 @@ public class GroupMetadataManagerTest {
         );
 
         assertEquals(Map.of(t1Uuid, Set.of(0, 1), t2Uuid, Set.of(0, 1)), 
context.groupMetadataManager.initializedShareGroupPartitions(groupId));
+        verify(context.metrics, 
times(2)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
     }
 
     @Test
@@ -22415,6 +22419,7 @@ public class GroupMetadataManagerTest {
             2,
             true
         );
+        verify(context.metrics, 
times(2)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
     }
 
     @Test
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
index fa285b5bbf6..f4d2ad325b1 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
@@ -104,14 +104,8 @@ public class GroupCoordinatorMetricsTest {
                 "group-count",
                 GroupCoordinatorMetrics.METRICS_GROUP,
                 Map.of("protocol", Group.GroupType.SHARE.toString())),
-            metrics.metricName(
-                "rebalance-rate",
-                GroupCoordinatorMetrics.METRICS_GROUP,
-                Map.of("protocol", Group.GroupType.SHARE.toString())),
-            metrics.metricName(
-                "rebalance-count",
-                GroupCoordinatorMetrics.METRICS_GROUP,
-                Map.of("protocol", Group.GroupType.SHARE.toString())),
+            metrics.metricName("share-group-rebalance-rate", 
GroupCoordinatorMetrics.METRICS_GROUP),
+            metrics.metricName("share-group-rebalance-count", 
GroupCoordinatorMetrics.METRICS_GROUP),
             metrics.metricName(
                 "share-group-count",
                 GroupCoordinatorMetrics.METRICS_GROUP,
@@ -304,16 +298,14 @@ public class GroupCoordinatorMetricsTest {
 
         shard.record(SHARE_GROUP_REBALANCES_SENSOR_NAME, 50);
         assertMetricValue(metrics, metrics.metricName(
-            "rebalance-rate",
+            "share-group-rebalance-rate",
             GroupCoordinatorMetrics.METRICS_GROUP,
-            "The rate of share group rebalances",
-            "protocol", "share"
+            "The rate of share group rebalances"
         ), 5.0 / 3.0);
         assertMetricValue(metrics, metrics.metricName(
-            "rebalance-count",
+            "share-group-rebalance-count",
             GroupCoordinatorMetrics.METRICS_GROUP,
-            "The total number of share group rebalances",
-            "protocol", "share"
+            "The total number of share group rebalances"
         ), 50);
 
         shard.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME, 50);

Reply via email to