dajac commented on code in PR #14848: URL: https://github.com/apache/kafka/pull/14848#discussion_r1422297921
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java: ########## @@ -29,44 +28,75 @@ import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ConcurrentHashMap; /** * These are the metrics which are managed by the {@link org.apache.kafka.coordinator.group.GroupMetadataManager} class. * They generally pertain to aspects of group management, such as the number of groups in different states. */ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoCloseable { + + enum GenericGroupState { Review Comment: Those two enums are really confusing because they duplicate existing ones. Could we just reuse the existing ones? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java: ########## @@ -29,44 +28,75 @@ import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ConcurrentHashMap; /** * These are the metrics which are managed by the {@link org.apache.kafka.coordinator.group.GroupMetadataManager} class. * They generally pertain to aspects of group management, such as the number of groups in different states. */ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoCloseable { + + enum GenericGroupState { + ALL("all"), + PREPARING_REBALANCE("PreparingRebalance"), + COMPLETING_REBALANCE("CompletingRebalance"), + STABLE("Stable"), + DEAD("Dead"), + EMPTY("Empty"); + + private final String name; + + GenericGroupState(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } + + enum ConsumerGroupState { + ALL("all"), + EMPTY("empty"), + ASSIGNING("assigning"), + RECONCILING("reconciling"), + STABLE("stable"), + DEAD("dead"); + + private final String name; + + ConsumerGroupState(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } + public static final String METRICS_GROUP = "group-coordinator-metrics"; - public final static MetricName NUM_OFFSETS = getMetricName( + public final static com.yammer.metrics.core.MetricName NUM_OFFSETS = getMetricName( "GroupMetadataManager", "NumOffsets"); - public final static MetricName NUM_GENERIC_GROUPS = getMetricName( + public final static com.yammer.metrics.core.MetricName NUM_GENERIC_GROUPS = getMetricName( "GroupMetadataManager", "NumGroups"); - public final static MetricName NUM_GENERIC_GROUPS_PREPARING_REBALANCE = getMetricName( + public final static com.yammer.metrics.core.MetricName NUM_GENERIC_GROUPS_PREPARING_REBALANCE = getMetricName( "GroupMetadataManager", "NumGroupsPreparingRebalance"); - public final static MetricName NUM_GENERIC_GROUPS_COMPLETING_REBALANCE = getMetricName( + public final static com.yammer.metrics.core.MetricName NUM_GENERIC_GROUPS_COMPLETING_REBALANCE = getMetricName( "GroupMetadataManager", "NumGroupsCompletingRebalance"); - public final static MetricName NUM_GENERIC_GROUPS_STABLE = getMetricName( + public final static com.yammer.metrics.core.MetricName NUM_GENERIC_GROUPS_STABLE = getMetricName( "GroupMetadataManager", "NumGroupsStable"); - public final static MetricName NUM_GENERIC_GROUPS_DEAD = getMetricName( + public final static com.yammer.metrics.core.MetricName NUM_GENERIC_GROUPS_DEAD = getMetricName( "GroupMetadataManager", "NumGroupsDead"); - public final static MetricName NUM_GENERIC_GROUPS_EMPTY = getMetricName( + public final static com.yammer.metrics.core.MetricName NUM_GENERIC_GROUPS_EMPTY = getMetricName( "GroupMetadataManager", "NumGroupsEmpty"); - public final static MetricName NUM_CONSUMER_GROUPS = getMetricName( - "GroupMetadataManager", "NumConsumerGroups"); - public final static MetricName NUM_CONSUMER_GROUPS_EMPTY = getMetricName( - "GroupMetadataManager", "NumConsumerGroupsEmpty"); - public final static MetricName NUM_CONSUMER_GROUPS_ASSIGNING = getMetricName( - "GroupMetadataManager", "NumConsumerGroupsAssigning"); - public final static MetricName NUM_CONSUMER_GROUPS_RECONCILING = getMetricName( - "GroupMetadataManager", "NumConsumerGroupsReconciling"); - public final static MetricName NUM_CONSUMER_GROUPS_STABLE = getMetricName( - "GroupMetadataManager", "NumConsumerGroupsStable"); - public final static MetricName NUM_CONSUMER_GROUPS_DEAD = getMetricName( - "GroupMetadataManager", "NumConsumerGroupsDead"); + + public final static String NUM_CONSUMER_GROUPS_METRIC_NAME = "consumer-groups-size"; Review Comment: It looks like we also use `count` in other places. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java: ########## @@ -281,100 +306,98 @@ public void onUpdateLastCommittedOffset(TopicPartition tp, long offset) { } } - public static MetricName getMetricName(String type, String name) { + public static com.yammer.metrics.core.MetricName getMetricName(String type, String name) { return getMetricName("kafka.coordinator.group", type, name); } private void registerGauges() { - registry.newGauge(NUM_OFFSETS, new Gauge<Long>() { + registry.newGauge(NUM_OFFSETS, new com.yammer.metrics.core.Gauge<Long>() { @Override public Long value() { return numOffsets(); } }); - registry.newGauge(NUM_GENERIC_GROUPS, new Gauge<Long>() { + registry.newGauge(NUM_GENERIC_GROUPS, new com.yammer.metrics.core.Gauge<Long>() { @Override public Long value() { return numGenericGroups(); } }); - registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new Gauge<Long>() { - @Override - public Long value() { - return numGenericGroupsPreparingRebalanceCount(); - } - }); - - registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new Gauge<Long>() { - @Override - public Long value() { - return numGenericGroupsCompletingRebalanceCount(); - } - }); - - registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new Gauge<Long>() { - @Override - public Long value() { - return numGenericGroupsStableCount(); - } - }); - - registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new Gauge<Long>() { - @Override - public Long value() { - return numGenericGroupsDeadCount(); - } - }); - - registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new Gauge<Long>() { + registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new com.yammer.metrics.core.Gauge<Long>() { @Override public Long value() { - return numGenericGroupsEmptyCount(); + return numGenericGroupsPreparingRebalance(); } }); - registry.newGauge(NUM_CONSUMER_GROUPS, new Gauge<Long>() { + registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new com.yammer.metrics.core.Gauge<Long>() { @Override public Long value() { - return numConsumerGroups(); + return numGenericGroupsCompletingRebalance(); } }); - registry.newGauge(NUM_CONSUMER_GROUPS_EMPTY, new Gauge<Long>() { + registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new com.yammer.metrics.core.Gauge<Long>() { @Override public Long value() { - return numConsumerGroupsEmpty(); + return numGenericGroupsStable(); } }); - registry.newGauge(NUM_CONSUMER_GROUPS_ASSIGNING, new Gauge<Long>() { + registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new com.yammer.metrics.core.Gauge<Long>() { @Override public Long value() { - return numConsumerGroupsAssigning(); + return numGenericGroupsDead(); } }); - registry.newGauge(NUM_CONSUMER_GROUPS_RECONCILING, new Gauge<Long>() { + registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new com.yammer.metrics.core.Gauge<Long>() { @Override public Long value() { - return numConsumerGroupsReconciling(); + return numGenericGroupsEmpty(); } }); - registry.newGauge(NUM_CONSUMER_GROUPS_STABLE, new Gauge<Long>() { - @Override - public Long value() { - return numConsumerGroupsStable(); - } - }); - - registry.newGauge(NUM_CONSUMER_GROUPS_DEAD, new Gauge<Long>() { - @Override - public Long value() { - return numConsumerGroupsDead(); - } - }); + metrics.addMetric( + metrics.metricName(NUM_CONSUMER_GROUPS_METRIC_NAME, METRICS_GROUP), + (Gauge<Long>) (config, now) -> numConsumerGroups() Review Comment: I wonder if we could have `NUM_GROUPS_METRIC_NAME` with a tag called `type`. This would work for all group types in the future. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java: ########## @@ -29,44 +28,75 @@ import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.Objects; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ConcurrentHashMap; /** * These are the metrics which are managed by the {@link org.apache.kafka.coordinator.group.GroupMetadataManager} class. * They generally pertain to aspects of group management, such as the number of groups in different states. */ public class GroupCoordinatorMetrics extends CoordinatorMetrics implements AutoCloseable { + + enum GenericGroupState { + ALL("all"), + PREPARING_REBALANCE("PreparingRebalance"), + COMPLETING_REBALANCE("CompletingRebalance"), + STABLE("Stable"), + DEAD("Dead"), + EMPTY("Empty"); + + private final String name; + + GenericGroupState(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } + + enum ConsumerGroupState { + ALL("all"), + EMPTY("empty"), + ASSIGNING("assigning"), + RECONCILING("reconciling"), + STABLE("stable"), + DEAD("dead"); + + private final String name; + + ConsumerGroupState(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } + public static final String METRICS_GROUP = "group-coordinator-metrics"; - public final static MetricName NUM_OFFSETS = getMetricName( + public final static com.yammer.metrics.core.MetricName NUM_OFFSETS = getMetricName( "GroupMetadataManager", "NumOffsets"); - public final static MetricName NUM_GENERIC_GROUPS = getMetricName( + public final static com.yammer.metrics.core.MetricName NUM_GENERIC_GROUPS = getMetricName( "GroupMetadataManager", "NumGroups"); - public final static MetricName NUM_GENERIC_GROUPS_PREPARING_REBALANCE = getMetricName( + public final static com.yammer.metrics.core.MetricName NUM_GENERIC_GROUPS_PREPARING_REBALANCE = getMetricName( "GroupMetadataManager", "NumGroupsPreparingRebalance"); - public final static MetricName NUM_GENERIC_GROUPS_COMPLETING_REBALANCE = getMetricName( + public final static com.yammer.metrics.core.MetricName NUM_GENERIC_GROUPS_COMPLETING_REBALANCE = getMetricName( "GroupMetadataManager", "NumGroupsCompletingRebalance"); - public final static MetricName NUM_GENERIC_GROUPS_STABLE = getMetricName( + public final static com.yammer.metrics.core.MetricName NUM_GENERIC_GROUPS_STABLE = getMetricName( "GroupMetadataManager", "NumGroupsStable"); - public final static MetricName NUM_GENERIC_GROUPS_DEAD = getMetricName( + public final static com.yammer.metrics.core.MetricName NUM_GENERIC_GROUPS_DEAD = getMetricName( "GroupMetadataManager", "NumGroupsDead"); - public final static MetricName NUM_GENERIC_GROUPS_EMPTY = getMetricName( + public final static com.yammer.metrics.core.MetricName NUM_GENERIC_GROUPS_EMPTY = getMetricName( "GroupMetadataManager", "NumGroupsEmpty"); - public final static MetricName NUM_CONSUMER_GROUPS = getMetricName( - "GroupMetadataManager", "NumConsumerGroups"); - public final static MetricName NUM_CONSUMER_GROUPS_EMPTY = getMetricName( - "GroupMetadataManager", "NumConsumerGroupsEmpty"); - public final static MetricName NUM_CONSUMER_GROUPS_ASSIGNING = getMetricName( - "GroupMetadataManager", "NumConsumerGroupsAssigning"); - public final static MetricName NUM_CONSUMER_GROUPS_RECONCILING = getMetricName( - "GroupMetadataManager", "NumConsumerGroupsReconciling"); - public final static MetricName NUM_CONSUMER_GROUPS_STABLE = getMetricName( - "GroupMetadataManager", "NumConsumerGroupsStable"); - public final static MetricName NUM_CONSUMER_GROUPS_DEAD = getMetricName( - "GroupMetadataManager", "NumConsumerGroupsDead"); + + public final static String NUM_CONSUMER_GROUPS_METRIC_NAME = "consumer-groups-size"; Review Comment: I am debating the name for this one. In the coordinator runtime metrics, we used `num-` instead of `-size`. I wonder whether we should do the same here. What do you think? Have you seen `-size` used more often? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java: ########## @@ -281,100 +306,98 @@ public void onUpdateLastCommittedOffset(TopicPartition tp, long offset) { } } - public static MetricName getMetricName(String type, String name) { + public static com.yammer.metrics.core.MetricName getMetricName(String type, String name) { return getMetricName("kafka.coordinator.group", type, name); } private void registerGauges() { - registry.newGauge(NUM_OFFSETS, new Gauge<Long>() { + registry.newGauge(NUM_OFFSETS, new com.yammer.metrics.core.Gauge<Long>() { @Override public Long value() { return numOffsets(); } }); - registry.newGauge(NUM_GENERIC_GROUPS, new Gauge<Long>() { + registry.newGauge(NUM_GENERIC_GROUPS, new com.yammer.metrics.core.Gauge<Long>() { @Override public Long value() { return numGenericGroups(); } }); - registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new Gauge<Long>() { - @Override - public Long value() { - return numGenericGroupsPreparingRebalanceCount(); - } - }); - - registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new Gauge<Long>() { - @Override - public Long value() { - return numGenericGroupsCompletingRebalanceCount(); - } - }); - - registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new Gauge<Long>() { - @Override - public Long value() { - return numGenericGroupsStableCount(); - } - }); - - registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new Gauge<Long>() { - @Override - public Long value() { - return numGenericGroupsDeadCount(); - } - }); - - registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new Gauge<Long>() { + registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new com.yammer.metrics.core.Gauge<Long>() { @Override public Long value() { - return numGenericGroupsEmptyCount(); + return numGenericGroupsPreparingRebalance(); } }); - registry.newGauge(NUM_CONSUMER_GROUPS, new Gauge<Long>() { + registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new com.yammer.metrics.core.Gauge<Long>() { @Override public Long value() { - return numConsumerGroups(); + return numGenericGroupsCompletingRebalance(); } }); - registry.newGauge(NUM_CONSUMER_GROUPS_EMPTY, new Gauge<Long>() { + registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new com.yammer.metrics.core.Gauge<Long>() { @Override public Long value() { - return numConsumerGroupsEmpty(); + return numGenericGroupsStable(); } }); - registry.newGauge(NUM_CONSUMER_GROUPS_ASSIGNING, new Gauge<Long>() { + registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new com.yammer.metrics.core.Gauge<Long>() { @Override public Long value() { - return numConsumerGroupsAssigning(); + return numGenericGroupsDead(); } }); - registry.newGauge(NUM_CONSUMER_GROUPS_RECONCILING, new Gauge<Long>() { + registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new com.yammer.metrics.core.Gauge<Long>() { @Override public Long value() { - return numConsumerGroupsReconciling(); + return numGenericGroupsEmpty(); } }); - registry.newGauge(NUM_CONSUMER_GROUPS_STABLE, new Gauge<Long>() { - @Override - public Long value() { - return numConsumerGroupsStable(); - } - }); - - registry.newGauge(NUM_CONSUMER_GROUPS_DEAD, new Gauge<Long>() { - @Override - public Long value() { - return numConsumerGroupsDead(); - } - }); + metrics.addMetric( + metrics.metricName(NUM_CONSUMER_GROUPS_METRIC_NAME, METRICS_GROUP), + (Gauge<Long>) (config, now) -> numConsumerGroups() + ); + + metrics.addMetric( + metrics.metricName(NUM_CONSUMER_GROUPS_METRIC_NAME, METRICS_GROUP, Utils.mkMap( + Utils.mkEntry(NUM_CONSUMER_GROUPS_STATE_TAG, ConsumerGroupState.EMPTY.toString()) + )), Review Comment: nit: Collections.singletonMap()? This also applies to the other ones below. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java: ########## @@ -95,95 +88,147 @@ public TimelineGaugeCounter(TimelineLong timelineLong, AtomicLong atomicLong) { public GroupCoordinatorMetricsShard( SnapshotRegistry snapshotRegistry, Map<String, Sensor> globalSensors, - Map<String, AtomicLong> globalGauges, TopicPartition topicPartition ) { Objects.requireNonNull(snapshotRegistry); - TimelineLong numOffsetsTimeline = new TimelineLong(snapshotRegistry); - TimelineLong numGenericGroupsTimeline = new TimelineLong(snapshotRegistry); - TimelineLong numConsumerGroupsTimeline = new TimelineLong(snapshotRegistry); + numOffsetsTimelineGaugeCounter = new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0)); + numGenericGroupsTimelineCounter = new TimelineGaugeCounter(new TimelineLong(snapshotRegistry), new AtomicLong(0)); + + this.genericGroupGauges = Utils.mkMap( + Utils.mkEntry(GroupCoordinatorMetrics.GenericGroupState.PREPARING_REBALANCE, new AtomicLong(0)), + Utils.mkEntry(GroupCoordinatorMetrics.GenericGroupState.COMPLETING_REBALANCE, new AtomicLong(0)), + Utils.mkEntry(GroupCoordinatorMetrics.GenericGroupState.STABLE, new AtomicLong(0)), + Utils.mkEntry(GroupCoordinatorMetrics.GenericGroupState.DEAD, new AtomicLong(0)), + Utils.mkEntry(GroupCoordinatorMetrics.GenericGroupState.EMPTY, new AtomicLong(0)) + ); + TimelineLong numConsumerGroupsEmptyTimeline = new TimelineLong(snapshotRegistry); TimelineLong numConsumerGroupsAssigningTimeline = new TimelineLong(snapshotRegistry); TimelineLong numConsumerGroupsReconcilingTimeline = new TimelineLong(snapshotRegistry); TimelineLong numConsumerGroupsStableTimeline = new TimelineLong(snapshotRegistry); TimelineLong numConsumerGroupsDeadTimeline = new TimelineLong(snapshotRegistry); - this.localGauges = Collections.unmodifiableMap(Utils.mkMap( - Utils.mkEntry(NUM_OFFSETS.getName(), - new TimelineGaugeCounter(numOffsetsTimeline, new AtomicLong(0))), - Utils.mkEntry(NUM_GENERIC_GROUPS.getName(), - new TimelineGaugeCounter(numGenericGroupsTimeline, new AtomicLong(0))), - Utils.mkEntry(NUM_CONSUMER_GROUPS.getName(), - new TimelineGaugeCounter(numConsumerGroupsTimeline, new AtomicLong(0))), - Utils.mkEntry(NUM_CONSUMER_GROUPS_EMPTY.getName(), + this.consumerGroupGauges = Utils.mkMap( + Utils.mkEntry(GroupCoordinatorMetrics.ConsumerGroupState.EMPTY, new TimelineGaugeCounter(numConsumerGroupsEmptyTimeline, new AtomicLong(0))), - Utils.mkEntry(NUM_CONSUMER_GROUPS_ASSIGNING.getName(), + Utils.mkEntry(GroupCoordinatorMetrics.ConsumerGroupState.ASSIGNING, new TimelineGaugeCounter(numConsumerGroupsAssigningTimeline, new AtomicLong(0))), - Utils.mkEntry(NUM_CONSUMER_GROUPS_RECONCILING.getName(), + Utils.mkEntry(GroupCoordinatorMetrics.ConsumerGroupState.RECONCILING, new TimelineGaugeCounter(numConsumerGroupsReconcilingTimeline, new AtomicLong(0))), - Utils.mkEntry(NUM_CONSUMER_GROUPS_STABLE.getName(), + Utils.mkEntry(GroupCoordinatorMetrics.ConsumerGroupState.STABLE, new TimelineGaugeCounter(numConsumerGroupsStableTimeline, new AtomicLong(0))), - Utils.mkEntry(NUM_CONSUMER_GROUPS_DEAD.getName(), + Utils.mkEntry(GroupCoordinatorMetrics.ConsumerGroupState.DEAD, new TimelineGaugeCounter(numConsumerGroupsDeadTimeline, new AtomicLong(0))) - )); + ); this.globalSensors = Objects.requireNonNull(globalSensors); - this.globalGauges = Objects.requireNonNull(globalGauges); this.topicPartition = Objects.requireNonNull(topicPartition); } - @Override - public void incrementGlobalGauge(MetricName metricName) { - AtomicLong gaugeCounter = globalGauges.get(metricName.getName()); - if (gaugeCounter != null) { - gaugeCounter.incrementAndGet(); + public void incrementNumGenericGroups(GroupCoordinatorMetrics.GenericGroupState state) { + AtomicLong counter = genericGroupGauges.get(state); + if (counter != null) { + counter.incrementAndGet(); } } - @Override - public void incrementLocalGauge(MetricName metricName) { - TimelineGaugeCounter gaugeCounter = localGauges.get(metricName.getName()); + /** + * Increment the number of offsets. + */ + public void incrementNumOffsets() { + synchronized (numOffsetsTimelineGaugeCounter.timelineLong) { + numOffsetsTimelineGaugeCounter.timelineLong.increment(); + } + } + + /** + * Increment the number of consumer groups. + * + * @param state the consumer group state. + */ + public void incrementNumConsumerGroups(GroupCoordinatorMetrics.ConsumerGroupState state) { + TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state); if (gaugeCounter != null) { synchronized (gaugeCounter.timelineLong) { gaugeCounter.timelineLong.increment(); } } } - @Override - public void decrementGlobalGauge(MetricName metricName) { - AtomicLong gaugeCounter = globalGauges.get(metricName.getName()); - if (gaugeCounter != null) { - gaugeCounter.decrementAndGet(); + /** + * Decrement the number of offsets. + */ + public void decrementNumOffsets() { + synchronized (numOffsetsTimelineGaugeCounter.timelineLong) { + numOffsetsTimelineGaugeCounter.timelineLong.decrement(); } } - @Override - public void decrementLocalGauge(MetricName metricName) { - TimelineGaugeCounter gaugeCounter = localGauges.get(metricName.getName()); + /** + * Decrement the number of consumer groups. + * + * @param state the consumer group state. + */ + public void decrementNumGenericGroups(GroupCoordinatorMetrics.GenericGroupState state) { + AtomicLong counter = genericGroupGauges.get(state); + if (counter != null) { + counter.decrementAndGet(); + } + } + + /** + * Decrement the number of consumer groups. + * + * @param state the consumer group state. + */ + public void decrementNumConsumerGroups(GroupCoordinatorMetrics.ConsumerGroupState state) { + TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state); if (gaugeCounter != null) { synchronized (gaugeCounter.timelineLong) { gaugeCounter.timelineLong.decrement(); } } } - @Override - public long globalGaugeValue(MetricName metricName) { - AtomicLong gaugeCounter = globalGauges.get(metricName.getName()); - if (gaugeCounter != null) { - return gaugeCounter.get(); + /** + * Obtain the number of offsets. + */ + public long numOffsets() { + return numOffsetsTimelineGaugeCounter.atomicLong.get(); + } + + /** + * Obtain the number of generic groups. + */ + public long numGenericGroups(GroupCoordinatorMetrics.GenericGroupState state) { + if (state == GroupCoordinatorMetrics.GenericGroupState.ALL) { + return genericGroupGauges.values().stream() + .mapToLong(AtomicLong::get).sum(); } - return 0; + + AtomicLong counter = genericGroupGauges.get(state); + if (counter != null) { + return counter.get(); + } + return 0L; } - @Override - public long localGaugeValue(MetricName metricName) { - TimelineGaugeCounter gaugeCounter = localGauges.get(metricName.getName()); + /** + * Obtain the current value of a local consumer group gauge. + * + * @param state the consumer group state. + */ + public long numConsumerGroups(GroupCoordinatorMetrics.ConsumerGroupState state) { + if (state == GroupCoordinatorMetrics.ConsumerGroupState.ALL) { Review Comment: Do we actually call it with `ALL`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org