dajac commented on code in PR #14848:
URL: https://github.com/apache/kafka/pull/14848#discussion_r1422297921


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java:
##########
@@ -29,44 +28,75 @@
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * These are the metrics which are managed by the {@link 
org.apache.kafka.coordinator.group.GroupMetadataManager} class.
  * They generally pertain to aspects of group management, such as the number 
of groups in different states.
  */
 public class GroupCoordinatorMetrics extends CoordinatorMetrics implements 
AutoCloseable {
+
+    enum GenericGroupState {

Review Comment:
   Those two enums are really confusing because they duplicate existing ones. 
Could we just reuse the existing ones?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java:
##########
@@ -29,44 +28,75 @@
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * These are the metrics which are managed by the {@link 
org.apache.kafka.coordinator.group.GroupMetadataManager} class.
  * They generally pertain to aspects of group management, such as the number 
of groups in different states.
  */
 public class GroupCoordinatorMetrics extends CoordinatorMetrics implements 
AutoCloseable {
+
+    enum GenericGroupState {
+        ALL("all"),
+        PREPARING_REBALANCE("PreparingRebalance"),
+        COMPLETING_REBALANCE("CompletingRebalance"),
+        STABLE("Stable"),
+        DEAD("Dead"),
+        EMPTY("Empty");
+
+        private final String name;
+
+        GenericGroupState(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+
+    enum ConsumerGroupState {
+        ALL("all"),
+        EMPTY("empty"),
+        ASSIGNING("assigning"),
+        RECONCILING("reconciling"),
+        STABLE("stable"),
+        DEAD("dead");
+
+        private final String name;
+
+        ConsumerGroupState(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+
     public static final String METRICS_GROUP = "group-coordinator-metrics";
 
-    public final static MetricName NUM_OFFSETS = getMetricName(
+    public final static com.yammer.metrics.core.MetricName NUM_OFFSETS = 
getMetricName(
         "GroupMetadataManager", "NumOffsets");
-    public final static MetricName NUM_GENERIC_GROUPS = getMetricName(
+    public final static com.yammer.metrics.core.MetricName NUM_GENERIC_GROUPS 
= getMetricName(
         "GroupMetadataManager", "NumGroups");
-    public final static MetricName NUM_GENERIC_GROUPS_PREPARING_REBALANCE = 
getMetricName(
+    public final static com.yammer.metrics.core.MetricName 
NUM_GENERIC_GROUPS_PREPARING_REBALANCE = getMetricName(
         "GroupMetadataManager", "NumGroupsPreparingRebalance");
-    public final static MetricName NUM_GENERIC_GROUPS_COMPLETING_REBALANCE = 
getMetricName(
+    public final static com.yammer.metrics.core.MetricName 
NUM_GENERIC_GROUPS_COMPLETING_REBALANCE = getMetricName(
         "GroupMetadataManager", "NumGroupsCompletingRebalance");
-    public final static MetricName NUM_GENERIC_GROUPS_STABLE = getMetricName(
+    public final static com.yammer.metrics.core.MetricName 
NUM_GENERIC_GROUPS_STABLE = getMetricName(
         "GroupMetadataManager", "NumGroupsStable");
-    public final static MetricName NUM_GENERIC_GROUPS_DEAD = getMetricName(
+    public final static com.yammer.metrics.core.MetricName 
NUM_GENERIC_GROUPS_DEAD = getMetricName(
         "GroupMetadataManager", "NumGroupsDead");
-    public final static MetricName NUM_GENERIC_GROUPS_EMPTY = getMetricName(
+    public final static com.yammer.metrics.core.MetricName 
NUM_GENERIC_GROUPS_EMPTY = getMetricName(
         "GroupMetadataManager", "NumGroupsEmpty");
-    public final static MetricName NUM_CONSUMER_GROUPS = getMetricName(
-        "GroupMetadataManager", "NumConsumerGroups");
-    public final static MetricName NUM_CONSUMER_GROUPS_EMPTY = getMetricName(
-        "GroupMetadataManager", "NumConsumerGroupsEmpty");
-    public final static MetricName NUM_CONSUMER_GROUPS_ASSIGNING = 
getMetricName(
-        "GroupMetadataManager", "NumConsumerGroupsAssigning");
-    public final static MetricName NUM_CONSUMER_GROUPS_RECONCILING = 
getMetricName(
-        "GroupMetadataManager", "NumConsumerGroupsReconciling");
-    public final static MetricName NUM_CONSUMER_GROUPS_STABLE = getMetricName(
-        "GroupMetadataManager", "NumConsumerGroupsStable");
-    public final static MetricName NUM_CONSUMER_GROUPS_DEAD = getMetricName(
-        "GroupMetadataManager", "NumConsumerGroupsDead");
+
+    public final static String NUM_CONSUMER_GROUPS_METRIC_NAME = 
"consumer-groups-size";

Review Comment:
   It looks like we also use `count` in other places.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java:
##########
@@ -281,100 +306,98 @@ public void onUpdateLastCommittedOffset(TopicPartition 
tp, long offset) {
         }
     }
 
-    public static MetricName getMetricName(String type, String name) {
+    public static com.yammer.metrics.core.MetricName getMetricName(String 
type, String name) {
         return getMetricName("kafka.coordinator.group", type, name);
     }
 
     private void registerGauges() {
-        registry.newGauge(NUM_OFFSETS, new Gauge<Long>() {
+        registry.newGauge(NUM_OFFSETS, new 
com.yammer.metrics.core.Gauge<Long>() {
             @Override
             public Long value() {
                 return numOffsets();
             }
         });
 
-        registry.newGauge(NUM_GENERIC_GROUPS, new Gauge<Long>() {
+        registry.newGauge(NUM_GENERIC_GROUPS, new 
com.yammer.metrics.core.Gauge<Long>() {
             @Override
             public Long value() {
                 return numGenericGroups();
             }
         });
 
-        registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new 
Gauge<Long>() {
-            @Override
-            public Long value() {
-                return numGenericGroupsPreparingRebalanceCount();
-            }
-        });
-
-        registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new 
Gauge<Long>() {
-            @Override
-            public Long value() {
-                return numGenericGroupsCompletingRebalanceCount();
-            }
-        });
-
-        registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new Gauge<Long>() {
-            @Override
-            public Long value() {
-                return numGenericGroupsStableCount();
-            }
-        });
-
-        registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new Gauge<Long>() {
-            @Override
-            public Long value() {
-                return numGenericGroupsDeadCount();
-            }
-        });
-
-        registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new Gauge<Long>() {
+        registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new 
com.yammer.metrics.core.Gauge<Long>() {
             @Override
             public Long value() {
-                return numGenericGroupsEmptyCount();
+                return numGenericGroupsPreparingRebalance();
             }
         });
 
-        registry.newGauge(NUM_CONSUMER_GROUPS, new Gauge<Long>() {
+        registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new 
com.yammer.metrics.core.Gauge<Long>() {
             @Override
             public Long value() {
-                return numConsumerGroups();
+                return numGenericGroupsCompletingRebalance();
             }
         });
 
-        registry.newGauge(NUM_CONSUMER_GROUPS_EMPTY, new Gauge<Long>() {
+        registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new 
com.yammer.metrics.core.Gauge<Long>() {
             @Override
             public Long value() {
-                return numConsumerGroupsEmpty();
+                return numGenericGroupsStable();
             }
         });
 
-        registry.newGauge(NUM_CONSUMER_GROUPS_ASSIGNING, new Gauge<Long>() {
+        registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new 
com.yammer.metrics.core.Gauge<Long>() {
             @Override
             public Long value() {
-                return numConsumerGroupsAssigning();
+                return numGenericGroupsDead();
             }
         });
 
-        registry.newGauge(NUM_CONSUMER_GROUPS_RECONCILING, new Gauge<Long>() {
+        registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new 
com.yammer.metrics.core.Gauge<Long>() {
             @Override
             public Long value() {
-                return numConsumerGroupsReconciling();
+                return numGenericGroupsEmpty();
             }
         });
 
-        registry.newGauge(NUM_CONSUMER_GROUPS_STABLE, new Gauge<Long>() {
-            @Override
-            public Long value() {
-                return numConsumerGroupsStable();
-            }
-        });
-
-        registry.newGauge(NUM_CONSUMER_GROUPS_DEAD, new Gauge<Long>() {
-            @Override
-            public Long value() {
-                return numConsumerGroupsDead();
-            }
-        });
+        metrics.addMetric(
+            metrics.metricName(NUM_CONSUMER_GROUPS_METRIC_NAME, METRICS_GROUP),
+            (Gauge<Long>) (config, now) -> numConsumerGroups()

Review Comment:
   I wonder if we could have `NUM_GROUPS_METRIC_NAME` with a tag called `type`. 
This would work for all group types in the future.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java:
##########
@@ -29,44 +28,75 @@
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * These are the metrics which are managed by the {@link 
org.apache.kafka.coordinator.group.GroupMetadataManager} class.
  * They generally pertain to aspects of group management, such as the number 
of groups in different states.
  */
 public class GroupCoordinatorMetrics extends CoordinatorMetrics implements 
AutoCloseable {
+
+    enum GenericGroupState {
+        ALL("all"),
+        PREPARING_REBALANCE("PreparingRebalance"),
+        COMPLETING_REBALANCE("CompletingRebalance"),
+        STABLE("Stable"),
+        DEAD("Dead"),
+        EMPTY("Empty");
+
+        private final String name;
+
+        GenericGroupState(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+
+    enum ConsumerGroupState {
+        ALL("all"),
+        EMPTY("empty"),
+        ASSIGNING("assigning"),
+        RECONCILING("reconciling"),
+        STABLE("stable"),
+        DEAD("dead");
+
+        private final String name;
+
+        ConsumerGroupState(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String toString() {
+            return name;
+        }
+    }
+
     public static final String METRICS_GROUP = "group-coordinator-metrics";
 
-    public final static MetricName NUM_OFFSETS = getMetricName(
+    public final static com.yammer.metrics.core.MetricName NUM_OFFSETS = 
getMetricName(
         "GroupMetadataManager", "NumOffsets");
-    public final static MetricName NUM_GENERIC_GROUPS = getMetricName(
+    public final static com.yammer.metrics.core.MetricName NUM_GENERIC_GROUPS 
= getMetricName(
         "GroupMetadataManager", "NumGroups");
-    public final static MetricName NUM_GENERIC_GROUPS_PREPARING_REBALANCE = 
getMetricName(
+    public final static com.yammer.metrics.core.MetricName 
NUM_GENERIC_GROUPS_PREPARING_REBALANCE = getMetricName(
         "GroupMetadataManager", "NumGroupsPreparingRebalance");
-    public final static MetricName NUM_GENERIC_GROUPS_COMPLETING_REBALANCE = 
getMetricName(
+    public final static com.yammer.metrics.core.MetricName 
NUM_GENERIC_GROUPS_COMPLETING_REBALANCE = getMetricName(
         "GroupMetadataManager", "NumGroupsCompletingRebalance");
-    public final static MetricName NUM_GENERIC_GROUPS_STABLE = getMetricName(
+    public final static com.yammer.metrics.core.MetricName 
NUM_GENERIC_GROUPS_STABLE = getMetricName(
         "GroupMetadataManager", "NumGroupsStable");
-    public final static MetricName NUM_GENERIC_GROUPS_DEAD = getMetricName(
+    public final static com.yammer.metrics.core.MetricName 
NUM_GENERIC_GROUPS_DEAD = getMetricName(
         "GroupMetadataManager", "NumGroupsDead");
-    public final static MetricName NUM_GENERIC_GROUPS_EMPTY = getMetricName(
+    public final static com.yammer.metrics.core.MetricName 
NUM_GENERIC_GROUPS_EMPTY = getMetricName(
         "GroupMetadataManager", "NumGroupsEmpty");
-    public final static MetricName NUM_CONSUMER_GROUPS = getMetricName(
-        "GroupMetadataManager", "NumConsumerGroups");
-    public final static MetricName NUM_CONSUMER_GROUPS_EMPTY = getMetricName(
-        "GroupMetadataManager", "NumConsumerGroupsEmpty");
-    public final static MetricName NUM_CONSUMER_GROUPS_ASSIGNING = 
getMetricName(
-        "GroupMetadataManager", "NumConsumerGroupsAssigning");
-    public final static MetricName NUM_CONSUMER_GROUPS_RECONCILING = 
getMetricName(
-        "GroupMetadataManager", "NumConsumerGroupsReconciling");
-    public final static MetricName NUM_CONSUMER_GROUPS_STABLE = getMetricName(
-        "GroupMetadataManager", "NumConsumerGroupsStable");
-    public final static MetricName NUM_CONSUMER_GROUPS_DEAD = getMetricName(
-        "GroupMetadataManager", "NumConsumerGroupsDead");
+
+    public final static String NUM_CONSUMER_GROUPS_METRIC_NAME = 
"consumer-groups-size";

Review Comment:
   I am debating the name for this one. In the coordinator runtime metrics, we 
used `num-` instead of `-size`. I wonder whether we should do the same here. 
What do you think? Have you seen `-size` used more often?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetrics.java:
##########
@@ -281,100 +306,98 @@ public void onUpdateLastCommittedOffset(TopicPartition 
tp, long offset) {
         }
     }
 
-    public static MetricName getMetricName(String type, String name) {
+    public static com.yammer.metrics.core.MetricName getMetricName(String 
type, String name) {
         return getMetricName("kafka.coordinator.group", type, name);
     }
 
     private void registerGauges() {
-        registry.newGauge(NUM_OFFSETS, new Gauge<Long>() {
+        registry.newGauge(NUM_OFFSETS, new 
com.yammer.metrics.core.Gauge<Long>() {
             @Override
             public Long value() {
                 return numOffsets();
             }
         });
 
-        registry.newGauge(NUM_GENERIC_GROUPS, new Gauge<Long>() {
+        registry.newGauge(NUM_GENERIC_GROUPS, new 
com.yammer.metrics.core.Gauge<Long>() {
             @Override
             public Long value() {
                 return numGenericGroups();
             }
         });
 
-        registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new 
Gauge<Long>() {
-            @Override
-            public Long value() {
-                return numGenericGroupsPreparingRebalanceCount();
-            }
-        });
-
-        registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new 
Gauge<Long>() {
-            @Override
-            public Long value() {
-                return numGenericGroupsCompletingRebalanceCount();
-            }
-        });
-
-        registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new Gauge<Long>() {
-            @Override
-            public Long value() {
-                return numGenericGroupsStableCount();
-            }
-        });
-
-        registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new Gauge<Long>() {
-            @Override
-            public Long value() {
-                return numGenericGroupsDeadCount();
-            }
-        });
-
-        registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new Gauge<Long>() {
+        registry.newGauge(NUM_GENERIC_GROUPS_PREPARING_REBALANCE, new 
com.yammer.metrics.core.Gauge<Long>() {
             @Override
             public Long value() {
-                return numGenericGroupsEmptyCount();
+                return numGenericGroupsPreparingRebalance();
             }
         });
 
-        registry.newGauge(NUM_CONSUMER_GROUPS, new Gauge<Long>() {
+        registry.newGauge(NUM_GENERIC_GROUPS_COMPLETING_REBALANCE, new 
com.yammer.metrics.core.Gauge<Long>() {
             @Override
             public Long value() {
-                return numConsumerGroups();
+                return numGenericGroupsCompletingRebalance();
             }
         });
 
-        registry.newGauge(NUM_CONSUMER_GROUPS_EMPTY, new Gauge<Long>() {
+        registry.newGauge(NUM_GENERIC_GROUPS_STABLE, new 
com.yammer.metrics.core.Gauge<Long>() {
             @Override
             public Long value() {
-                return numConsumerGroupsEmpty();
+                return numGenericGroupsStable();
             }
         });
 
-        registry.newGauge(NUM_CONSUMER_GROUPS_ASSIGNING, new Gauge<Long>() {
+        registry.newGauge(NUM_GENERIC_GROUPS_DEAD, new 
com.yammer.metrics.core.Gauge<Long>() {
             @Override
             public Long value() {
-                return numConsumerGroupsAssigning();
+                return numGenericGroupsDead();
             }
         });
 
-        registry.newGauge(NUM_CONSUMER_GROUPS_RECONCILING, new Gauge<Long>() {
+        registry.newGauge(NUM_GENERIC_GROUPS_EMPTY, new 
com.yammer.metrics.core.Gauge<Long>() {
             @Override
             public Long value() {
-                return numConsumerGroupsReconciling();
+                return numGenericGroupsEmpty();
             }
         });
 
-        registry.newGauge(NUM_CONSUMER_GROUPS_STABLE, new Gauge<Long>() {
-            @Override
-            public Long value() {
-                return numConsumerGroupsStable();
-            }
-        });
-
-        registry.newGauge(NUM_CONSUMER_GROUPS_DEAD, new Gauge<Long>() {
-            @Override
-            public Long value() {
-                return numConsumerGroupsDead();
-            }
-        });
+        metrics.addMetric(
+            metrics.metricName(NUM_CONSUMER_GROUPS_METRIC_NAME, METRICS_GROUP),
+            (Gauge<Long>) (config, now) -> numConsumerGroups()
+        );
+
+        metrics.addMetric(
+            metrics.metricName(NUM_CONSUMER_GROUPS_METRIC_NAME, METRICS_GROUP, 
Utils.mkMap(
+                Utils.mkEntry(NUM_CONSUMER_GROUPS_STATE_TAG, 
ConsumerGroupState.EMPTY.toString())
+            )),

Review Comment:
   nit: Collections.singletonMap()? This also applies to the other ones below.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/GroupCoordinatorMetricsShard.java:
##########
@@ -95,95 +88,147 @@ public TimelineGaugeCounter(TimelineLong timelineLong, 
AtomicLong atomicLong) {
     public GroupCoordinatorMetricsShard(
         SnapshotRegistry snapshotRegistry,
         Map<String, Sensor> globalSensors,
-        Map<String, AtomicLong> globalGauges,
         TopicPartition topicPartition
     ) {
         Objects.requireNonNull(snapshotRegistry);
-        TimelineLong numOffsetsTimeline = new TimelineLong(snapshotRegistry);
-        TimelineLong numGenericGroupsTimeline = new 
TimelineLong(snapshotRegistry);
-        TimelineLong numConsumerGroupsTimeline = new 
TimelineLong(snapshotRegistry);
+        numOffsetsTimelineGaugeCounter = new TimelineGaugeCounter(new 
TimelineLong(snapshotRegistry), new AtomicLong(0));
+        numGenericGroupsTimelineCounter = new TimelineGaugeCounter(new 
TimelineLong(snapshotRegistry), new AtomicLong(0));
+
+        this.genericGroupGauges = Utils.mkMap(
+            
Utils.mkEntry(GroupCoordinatorMetrics.GenericGroupState.PREPARING_REBALANCE, 
new AtomicLong(0)),
+            
Utils.mkEntry(GroupCoordinatorMetrics.GenericGroupState.COMPLETING_REBALANCE, 
new AtomicLong(0)),
+            Utils.mkEntry(GroupCoordinatorMetrics.GenericGroupState.STABLE, 
new AtomicLong(0)),
+            Utils.mkEntry(GroupCoordinatorMetrics.GenericGroupState.DEAD, new 
AtomicLong(0)),
+            Utils.mkEntry(GroupCoordinatorMetrics.GenericGroupState.EMPTY, new 
AtomicLong(0))
+        );
+        
         TimelineLong numConsumerGroupsEmptyTimeline = new 
TimelineLong(snapshotRegistry);
         TimelineLong numConsumerGroupsAssigningTimeline = new 
TimelineLong(snapshotRegistry);
         TimelineLong numConsumerGroupsReconcilingTimeline = new 
TimelineLong(snapshotRegistry);
         TimelineLong numConsumerGroupsStableTimeline = new 
TimelineLong(snapshotRegistry);
         TimelineLong numConsumerGroupsDeadTimeline = new 
TimelineLong(snapshotRegistry);
 
-        this.localGauges = Collections.unmodifiableMap(Utils.mkMap(
-            Utils.mkEntry(NUM_OFFSETS.getName(),
-                new TimelineGaugeCounter(numOffsetsTimeline, new 
AtomicLong(0))),
-            Utils.mkEntry(NUM_GENERIC_GROUPS.getName(),
-                new TimelineGaugeCounter(numGenericGroupsTimeline, new 
AtomicLong(0))),
-            Utils.mkEntry(NUM_CONSUMER_GROUPS.getName(),
-                new TimelineGaugeCounter(numConsumerGroupsTimeline, new 
AtomicLong(0))),
-            Utils.mkEntry(NUM_CONSUMER_GROUPS_EMPTY.getName(),
+        this.consumerGroupGauges = Utils.mkMap(
+            Utils.mkEntry(GroupCoordinatorMetrics.ConsumerGroupState.EMPTY,
                 new TimelineGaugeCounter(numConsumerGroupsEmptyTimeline, new 
AtomicLong(0))),
-            Utils.mkEntry(NUM_CONSUMER_GROUPS_ASSIGNING.getName(),
+            Utils.mkEntry(GroupCoordinatorMetrics.ConsumerGroupState.ASSIGNING,
                 new TimelineGaugeCounter(numConsumerGroupsAssigningTimeline, 
new AtomicLong(0))),
-            Utils.mkEntry(NUM_CONSUMER_GROUPS_RECONCILING.getName(),
+            
Utils.mkEntry(GroupCoordinatorMetrics.ConsumerGroupState.RECONCILING,
                 new TimelineGaugeCounter(numConsumerGroupsReconcilingTimeline, 
new AtomicLong(0))),
-            Utils.mkEntry(NUM_CONSUMER_GROUPS_STABLE.getName(),
+            Utils.mkEntry(GroupCoordinatorMetrics.ConsumerGroupState.STABLE,
                 new TimelineGaugeCounter(numConsumerGroupsStableTimeline, new 
AtomicLong(0))),
-            Utils.mkEntry(NUM_CONSUMER_GROUPS_DEAD.getName(),
+            Utils.mkEntry(GroupCoordinatorMetrics.ConsumerGroupState.DEAD,
                 new TimelineGaugeCounter(numConsumerGroupsDeadTimeline, new 
AtomicLong(0)))
-        ));
+        );
 
         this.globalSensors = Objects.requireNonNull(globalSensors);
-        this.globalGauges = Objects.requireNonNull(globalGauges);
         this.topicPartition = Objects.requireNonNull(topicPartition);
     }
 
-    @Override
-    public void incrementGlobalGauge(MetricName metricName) {
-        AtomicLong gaugeCounter = globalGauges.get(metricName.getName());
-        if (gaugeCounter != null) {
-            gaugeCounter.incrementAndGet();
+    public void 
incrementNumGenericGroups(GroupCoordinatorMetrics.GenericGroupState state) {
+        AtomicLong counter = genericGroupGauges.get(state);
+        if (counter != null) {
+            counter.incrementAndGet();
         }
     }
 
-    @Override
-    public void incrementLocalGauge(MetricName metricName) {
-        TimelineGaugeCounter gaugeCounter = 
localGauges.get(metricName.getName());
+    /**
+     * Increment the number of offsets.
+     */
+    public void incrementNumOffsets() {
+        synchronized (numOffsetsTimelineGaugeCounter.timelineLong) {
+            numOffsetsTimelineGaugeCounter.timelineLong.increment();
+        }
+    }
+
+    /**
+     * Increment the number of consumer groups.
+     *
+     * @param state the consumer group state.
+     */
+    public void 
incrementNumConsumerGroups(GroupCoordinatorMetrics.ConsumerGroupState state) {
+        TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
         if (gaugeCounter != null) {
             synchronized (gaugeCounter.timelineLong) {
                 gaugeCounter.timelineLong.increment();
             }
         }
     }
 
-    @Override
-    public void decrementGlobalGauge(MetricName metricName) {
-        AtomicLong gaugeCounter = globalGauges.get(metricName.getName());
-        if (gaugeCounter != null) {
-            gaugeCounter.decrementAndGet();
+    /**
+     * Decrement the number of offsets.
+     */
+    public void decrementNumOffsets() {
+        synchronized (numOffsetsTimelineGaugeCounter.timelineLong) {
+            numOffsetsTimelineGaugeCounter.timelineLong.decrement();
         }
     }
 
-    @Override
-    public void decrementLocalGauge(MetricName metricName) {
-        TimelineGaugeCounter gaugeCounter = 
localGauges.get(metricName.getName());
+    /**
+     * Decrement the number of consumer groups.
+     *
+     * @param state the consumer group state.
+     */
+    public void 
decrementNumGenericGroups(GroupCoordinatorMetrics.GenericGroupState state) {
+        AtomicLong counter = genericGroupGauges.get(state);
+        if (counter != null) {
+            counter.decrementAndGet();
+        }
+    }
+
+    /**
+     * Decrement the number of consumer groups.
+     *
+     * @param state the consumer group state.
+     */
+    public void 
decrementNumConsumerGroups(GroupCoordinatorMetrics.ConsumerGroupState state) {
+        TimelineGaugeCounter gaugeCounter = consumerGroupGauges.get(state);
         if (gaugeCounter != null) {
             synchronized (gaugeCounter.timelineLong) {
                 gaugeCounter.timelineLong.decrement();
             }
         }
     }
 
-    @Override
-    public long globalGaugeValue(MetricName metricName) {
-        AtomicLong gaugeCounter = globalGauges.get(metricName.getName());
-        if (gaugeCounter != null) {
-            return gaugeCounter.get();
+    /**
+     * Obtain the number of offsets.
+     */
+    public long numOffsets() {
+        return numOffsetsTimelineGaugeCounter.atomicLong.get();
+    }
+
+    /**
+     * Obtain the number of generic groups.
+     */
+    public long numGenericGroups(GroupCoordinatorMetrics.GenericGroupState 
state) {
+        if (state == GroupCoordinatorMetrics.GenericGroupState.ALL) {
+            return genericGroupGauges.values().stream()
+                .mapToLong(AtomicLong::get).sum();
         }
-        return 0;
+
+        AtomicLong counter = genericGroupGauges.get(state);
+        if (counter != null) {
+            return counter.get();
+        }
+        return 0L;
     }
 
-    @Override
-    public long localGaugeValue(MetricName metricName) {
-        TimelineGaugeCounter gaugeCounter = 
localGauges.get(metricName.getName());
+    /**
+     * Obtain the current value of a local consumer group gauge.
+     *
+     * @param state  the consumer group state.
+     */
+    public long numConsumerGroups(GroupCoordinatorMetrics.ConsumerGroupState 
state) {
+        if (state == GroupCoordinatorMetrics.ConsumerGroupState.ALL) {

Review Comment:
   Do we actually call it with `ALL`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to