This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 815dd93e2f9 MINOR: Invoke share group rebalance sensor. (#20006) 815dd93e2f9 is described below commit 815dd93e2f9936bb9a2b738b67b94e270ab0e48a Author: Sushant Mahajan <smaha...@confluent.io> AuthorDate: Sat Jun 21 13:05:19 2025 +0530 MINOR: Invoke share group rebalance sensor. (#20006) * The share group rebalance metric was not being invoked at the appropriate group id bump position. * This PR solves the issue. * The metric name has been updated (s/rebalance-rate/share-group-rebalance-rate, s/rebalance-count/share-group-rebalance-count/) * Updated tests in `GroupMetadataManagerTest` and `GroupCoordinatorMetricsTest` Reviewers: Andrew Schofield <aschofi...@confluent.io> --- .../coordinator/group/GroupMetadataManager.java | 2 ++ .../group/metrics/GroupCoordinatorMetrics.java | 10 ++++------ .../coordinator/group/GroupMetadataManagerTest.java | 5 +++++ .../group/metrics/GroupCoordinatorMetricsTest.java | 20 ++++++-------------- 4 files changed, 17 insertions(+), 20 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 20b9337fb41..9c9500485b1 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -248,6 +248,7 @@ import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPA import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged; import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord; @@ -2610,6 +2611,7 @@ public class GroupMetadataManager { groupEpoch += 1; records.add(newShareGroupEpochRecord(groupId, groupEpoch, groupMetadataHash)); log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", groupId, groupEpoch, groupMetadataHash); + metrics.record(SHARE_GROUP_REBALANCES_SENSOR_NAME); } group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch); diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java index dd21570b654..9d05727d2e9 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java @@ -297,14 +297,12 @@ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoC Sensor shareGroupRebalanceSensor = metrics.sensor(SHARE_GROUP_REBALANCES_SENSOR_NAME); shareGroupRebalanceSensor.add(new Meter( - metrics.metricName("rebalance-rate", + metrics.metricName("share-group-rebalance-rate", METRICS_GROUP, - "The rate of share group rebalances", - SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString()), - metrics.metricName("rebalance-count", + "The rate of share group rebalances"), + metrics.metricName("share-group-rebalance-count", METRICS_GROUP, - "The total number of share group rebalances", - SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString()))); + "The total number of share group rebalances"))); Sensor streamsGroupRebalanceSensor = metrics.sensor(STREAMS_GROUP_REBALANCES_SENSOR_NAME); streamsGroupRebalanceSensor.add(new Meter( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 3f47b8e70d8..1cf13c70490 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -203,6 +203,7 @@ import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPA import static org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME; import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME; +import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -19699,6 +19700,7 @@ public class GroupMetadataManagerTest { .setMemberId(Uuid.randomUuid().toString()) .setMemberEpoch(1) .setSubscribedTopicNames(List.of("foo", "bar")))); + verify(context.metrics, times(0)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME); } @Test @@ -19785,6 +19787,7 @@ public class GroupMetadataManagerTest { .setGroupId(groupId) .setMemberEpoch(0) .setSubscribedTopicNames(List.of("foo", "bar")))); + verify(context.metrics, times(0)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME); } @Test @@ -22306,6 +22309,7 @@ public class GroupMetadataManagerTest { ); assertEquals(Map.of(t1Uuid, Set.of(0, 1), t2Uuid, Set.of(0, 1)), context.groupMetadataManager.initializedShareGroupPartitions(groupId)); + verify(context.metrics, times(2)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME); } @Test @@ -22415,6 +22419,7 @@ public class GroupMetadataManagerTest { 2, true ); + verify(context.metrics, times(2)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME); } @Test diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java index fa285b5bbf6..f4d2ad325b1 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java @@ -104,14 +104,8 @@ public class GroupCoordinatorMetricsTest { "group-count", GroupCoordinatorMetrics.METRICS_GROUP, Map.of("protocol", Group.GroupType.SHARE.toString())), - metrics.metricName( - "rebalance-rate", - GroupCoordinatorMetrics.METRICS_GROUP, - Map.of("protocol", Group.GroupType.SHARE.toString())), - metrics.metricName( - "rebalance-count", - GroupCoordinatorMetrics.METRICS_GROUP, - Map.of("protocol", Group.GroupType.SHARE.toString())), + metrics.metricName("share-group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP), + metrics.metricName("share-group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP), metrics.metricName( "share-group-count", GroupCoordinatorMetrics.METRICS_GROUP, @@ -304,16 +298,14 @@ public class GroupCoordinatorMetricsTest { shard.record(SHARE_GROUP_REBALANCES_SENSOR_NAME, 50); assertMetricValue(metrics, metrics.metricName( - "rebalance-rate", + "share-group-rebalance-rate", GroupCoordinatorMetrics.METRICS_GROUP, - "The rate of share group rebalances", - "protocol", "share" + "The rate of share group rebalances" ), 5.0 / 3.0); assertMetricValue(metrics, metrics.metricName( - "rebalance-count", + "share-group-rebalance-count", GroupCoordinatorMetrics.METRICS_GROUP, - "The total number of share group rebalances", - "protocol", "share" + "The total number of share group rebalances" ), 50); shard.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME, 50);