This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 4c63711971b MINOR: Invoke share group rebalance sensor. (#20006)
4c63711971b is described below
commit 4c63711971b01b97e4ff1a6024c305d52a422d5c
Author: Sushant Mahajan <[email protected]>
AuthorDate: Sat Jun 21 13:05:19 2025 +0530
MINOR: Invoke share group rebalance sensor. (#20006)
* The share group rebalance metric was not being invoked at the
appropriate group id bump position.
* This PR solves the issue.
* The metric name has been updated
(s/rebalance-rate/share-group-rebalance-rate,
s/rebalance-count/share-group-rebalance-count/)
* Updated tests in `GroupMetadataManagerTest` and
`GroupCoordinatorMetricsTest`
Reviewers: Andrew Schofield <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 2 ++
.../group/metrics/GroupCoordinatorMetrics.java | 10 ++++------
.../coordinator/group/GroupMetadataManagerTest.java | 5 +++++
.../group/metrics/GroupCoordinatorMetricsTest.java | 20 ++++++--------------
4 files changed, 17 insertions(+), 20 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 20b9337fb41..9c9500485b1 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -248,6 +248,7 @@ import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPA
import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
+import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;
import static
org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord;
@@ -2610,6 +2611,7 @@ public class GroupMetadataManager {
groupEpoch += 1;
records.add(newShareGroupEpochRecord(groupId, groupEpoch,
groupMetadataHash));
log.info("[GroupId {}] Bumped group epoch to {} with metadata
hash {}.", groupId, groupEpoch, groupMetadataHash);
+ metrics.record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
}
group.setMetadataRefreshDeadline(currentTimeMs +
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
index dd21570b654..9d05727d2e9 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java
@@ -297,14 +297,12 @@ public class GroupCoordinatorMetrics extends
CoordinatorMetrics implements AutoC
Sensor shareGroupRebalanceSensor =
metrics.sensor(SHARE_GROUP_REBALANCES_SENSOR_NAME);
shareGroupRebalanceSensor.add(new Meter(
- metrics.metricName("rebalance-rate",
+ metrics.metricName("share-group-rebalance-rate",
METRICS_GROUP,
- "The rate of share group rebalances",
- SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString()),
- metrics.metricName("rebalance-count",
+ "The rate of share group rebalances"),
+ metrics.metricName("share-group-rebalance-count",
METRICS_GROUP,
- "The total number of share group rebalances",
- SHARE_GROUP_PROTOCOL_TAG, Group.GroupType.SHARE.toString())));
+ "The total number of share group rebalances")));
Sensor streamsGroupRebalanceSensor =
metrics.sensor(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
streamsGroupRebalanceSensor.add(new Meter(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 3f47b8e70d8..1cf13c70490 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -203,6 +203,7 @@ import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.PREPA
import static
org.apache.kafka.coordinator.group.classic.ClassicGroupState.STABLE;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CLASSIC_GROUP_COMPLETED_REBALANCES_SENSOR_NAME;
import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
+import static
org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -19699,6 +19700,7 @@ public class GroupMetadataManagerTest {
.setMemberId(Uuid.randomUuid().toString())
.setMemberEpoch(1)
.setSubscribedTopicNames(List.of("foo", "bar"))));
+ verify(context.metrics,
times(0)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
}
@Test
@@ -19785,6 +19787,7 @@ public class GroupMetadataManagerTest {
.setGroupId(groupId)
.setMemberEpoch(0)
.setSubscribedTopicNames(List.of("foo", "bar"))));
+ verify(context.metrics,
times(0)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
}
@Test
@@ -22306,6 +22309,7 @@ public class GroupMetadataManagerTest {
);
assertEquals(Map.of(t1Uuid, Set.of(0, 1), t2Uuid, Set.of(0, 1)),
context.groupMetadataManager.initializedShareGroupPartitions(groupId));
+ verify(context.metrics,
times(2)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
}
@Test
@@ -22415,6 +22419,7 @@ public class GroupMetadataManagerTest {
2,
true
);
+ verify(context.metrics,
times(2)).record(SHARE_GROUP_REBALANCES_SENSOR_NAME);
}
@Test
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
index 3aa0a861725..92a7cda8738 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsTest.java
@@ -106,14 +106,8 @@ public class GroupCoordinatorMetricsTest {
"group-count",
GroupCoordinatorMetrics.METRICS_GROUP,
Map.of("protocol", Group.GroupType.SHARE.toString())),
- metrics.metricName(
- "rebalance-rate",
- GroupCoordinatorMetrics.METRICS_GROUP,
- Map.of("protocol", Group.GroupType.SHARE.toString())),
- metrics.metricName(
- "rebalance-count",
- GroupCoordinatorMetrics.METRICS_GROUP,
- Map.of("protocol", Group.GroupType.SHARE.toString())),
+ metrics.metricName("share-group-rebalance-rate",
GroupCoordinatorMetrics.METRICS_GROUP),
+ metrics.metricName("share-group-rebalance-count",
GroupCoordinatorMetrics.METRICS_GROUP),
metrics.metricName(
"share-group-count",
GroupCoordinatorMetrics.METRICS_GROUP,
@@ -306,16 +300,14 @@ public class GroupCoordinatorMetricsTest {
shard.record(SHARE_GROUP_REBALANCES_SENSOR_NAME, 50);
assertMetricValue(metrics, metrics.metricName(
- "rebalance-rate",
+ "share-group-rebalance-rate",
GroupCoordinatorMetrics.METRICS_GROUP,
- "The rate of share group rebalances",
- "protocol", "share"
+ "The rate of share group rebalances"
), 5.0 / 3.0);
assertMetricValue(metrics, metrics.metricName(
- "rebalance-count",
+ "share-group-rebalance-count",
GroupCoordinatorMetrics.METRICS_GROUP,
- "The total number of share group rebalances",
- "protocol", "share"
+ "The total number of share group rebalances"
), 50);
shard.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME, 50);