This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d226b435972 KAFKA-18220: Refactor AsyncConsumerMetrics to not extend 
KafkaConsumerMetrics (#20283)
d226b435972 is described below

commit d226b435972c2b0bac99668c17fa8942a1d431bf
Author: Shivsundar R <[email protected]>
AuthorDate: Wed Sep 3 07:35:55 2025 -0400

    KAFKA-18220: Refactor AsyncConsumerMetrics to not extend 
KafkaConsumerMetrics (#20283)
    
    *What*
    https://issues.apache.org/jira/browse/KAFKA-18220
    
    - Currently, `AsyncConsumerMetrics` extends `KafkaConsumerMetrics`, but
    is being used both by `AsyncKafkaConsumer` and `ShareConsumerImpl`.
    
    - `ShareConsumerImpl` only needs the async consumer metrics(the metrics
    associated with the new consumer threading model).
    - This needs to be fixed, we are unnecessarily having
    `KafkaConsumerMetrics` as a parent class for `ShareConsumer` metrics.
    
    Fix :
    - In this PR, we have removed the dependancy of `AsyncConsumerMetrics`
    on `KafkaConsumerMetrics` and made it an independent class which both
    `AsyncKafkaConsumer` and `ShareConsumerImpl` will use.
    - The "`asyncConsumerMetrics`" field represents the metrics associated
    with the new consumer threading model (like application event queue
    size, background queue size, etc).
    - The "`kafkaConsumerMetrics`" and "`kafkaShareConsumerMetrics`" fields
    denote the actual consumer metrics for `KafkaConsumer` and
    `KafkaShareConsumer` respectively.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  42 ++++--
 .../consumer/internals/ClassicKafkaConsumer.java   |   6 +-
 .../consumer/internals/ConsumerMetrics.java        |   6 +-
 .../clients/consumer/internals/ConsumerUtils.java  |   1 +
 .../consumer/internals/ShareConsumerImpl.java      |  40 ++---
 .../consumer/internals/ShareConsumerMetrics.java   |   6 +-
 .../internals/metrics/AsyncConsumerMetrics.java    |  42 +++---
 .../internals/metrics/KafkaConsumerMetrics.java    |   6 +-
 .../metrics/KafkaShareConsumerMetrics.java         |   6 +-
 .../internals/ApplicationEventHandlerTest.java     |  10 +-
 .../consumer/internals/AsyncKafkaConsumerTest.java |   4 +-
 .../internals/BackgroundEventHandlerTest.java      |  15 +-
 .../internals/ConsumerNetworkThreadTest.java       |  26 ++--
 .../internals/KafkaConsumerMetricsTest.java        |  33 +++-
 .../internals/NetworkClientDelegateTest.java       |  16 +-
 .../metrics/AsyncConsumerMetricsTest.java          | 167 ++++++++++-----------
 16 files changed, 237 insertions(+), 189 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 1ca42dbc75c..5512c962606 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -78,6 +78,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChang
 import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
 import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import 
org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
 import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.IsolationLevel;
@@ -143,7 +144,7 @@ import java.util.stream.Collectors;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.TOPIC_PARTITION_COMPARATOR;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
@@ -323,7 +324,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     private final ApplicationEventHandler applicationEventHandler;
     private final Time time;
     private final AtomicReference<Optional<ConsumerGroupMetadata>> 
groupMetadata = new AtomicReference<>(Optional.empty());
-    private final AsyncConsumerMetrics kafkaConsumerMetrics;
+    private final AsyncConsumerMetrics asyncConsumerMetrics;
+    private final KafkaConsumerMetrics kafkaConsumerMetrics;
     private Logger log;
     private final String clientId;
     private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
@@ -434,7 +436,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             this.clientTelemetryReporter = 
CommonClientConfigs.telemetryReporter(clientId, config);
             this.clientTelemetryReporter.ifPresent(reporters::add);
             this.metrics = createMetrics(config, time, reporters);
-            this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
+            this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, 
CONSUMER_METRIC_GROUP);
+            this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics);
             this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
             this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
 
@@ -458,7 +461,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             this.backgroundEventHandler = new BackgroundEventHandler(
                 backgroundEventQueue,
                 time,
-                kafkaConsumerMetrics
+                asyncConsumerMetrics
             );
 
             // This FetchBuffer is shared between the application and network 
threads.
@@ -473,7 +476,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
                     backgroundEventHandler,
                     false,
-                    kafkaConsumerMetrics
+                    asyncConsumerMetrics
             );
             this.offsetCommitCallbackInvoker = new 
OffsetCommitCallbackInvoker(interceptors);
             this.groupMetadata.set(initializeGroupMetadata(config, 
groupRebalanceConfig));
@@ -506,7 +509,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     applicationEventProcessorSupplier,
                     networkClientDelegateSupplier,
                     requestManagersSupplier,
-                    kafkaConsumerMetrics
+                    asyncConsumerMetrics
             );
             this.rebalanceListenerInvoker = new 
ConsumerRebalanceListenerInvoker(
                     logContext,
@@ -584,14 +587,15 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.defaultApiTimeoutMs = Duration.ofMillis(defaultApiTimeoutMs);
         this.deserializers = deserializers;
         this.applicationEventHandler = applicationEventHandler;
-        this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
+        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics);
+        this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, 
CONSUMER_METRIC_GROUP);
         this.clientTelemetryReporter = Optional.empty();
         this.autoCommitEnabled = autoCommitEnabled;
         this.offsetCommitCallbackInvoker = new 
OffsetCommitCallbackInvoker(interceptors);
         this.backgroundEventHandler = new BackgroundEventHandler(
             backgroundEventQueue,
             time,
-            kafkaConsumerMetrics
+            asyncConsumerMetrics
         );
     }
 
@@ -619,7 +623,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.deserializers = new Deserializers<>(keyDeserializer, 
valueDeserializer, metrics);
         this.clientTelemetryReporter = Optional.empty();
 
-        ConsumerMetrics metricsRegistry = new 
ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
+        ConsumerMetrics metricsRegistry = new ConsumerMetrics();
         FetchMetricsManager fetchMetricsManager = new 
FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
         this.fetchCollector = new FetchCollector<>(logContext,
                 metadata,
@@ -628,7 +632,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 deserializers,
                 fetchMetricsManager,
                 time);
-        this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
+        this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, 
CONSUMER_METRIC_GROUP);
+        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics);
 
         GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
             config,
@@ -642,7 +647,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.backgroundEventHandler = new BackgroundEventHandler(
             backgroundEventQueue,
             time,
-            kafkaConsumerMetrics
+            asyncConsumerMetrics
         );
         this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
             logContext,
@@ -659,7 +664,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             metadata,
             backgroundEventHandler,
             false,
-            kafkaConsumerMetrics
+            asyncConsumerMetrics
         );
         this.offsetCommitCallbackInvoker = new 
OffsetCommitCallbackInvoker(interceptors);
         Supplier<RequestManagers> requestManagersSupplier = 
RequestManagers.supplier(
@@ -693,7 +698,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 applicationEventProcessorSupplier,
                 networkClientDelegateSupplier,
                 requestManagersSupplier,
-                kafkaConsumerMetrics);
+                asyncConsumerMetrics);
         this.backgroundEventProcessor = new BackgroundEventProcessor();
         this.backgroundEventReaper = new CompletableEventReaper(logContext);
     }
@@ -1489,6 +1494,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
         closeQuietly(interceptors, "consumer interceptors", firstException);
         closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics", 
firstException);
+        closeQuietly(asyncConsumerMetrics, "async consumer metrics", 
firstException);
         closeQuietly(metrics, "consumer metrics", firstException);
         closeQuietly(deserializers, "consumer deserializers", firstException);
         clientTelemetryReporter.ifPresent(reporter -> closeQuietly(reporter, 
"async consumer telemetry reporter", firstException));
@@ -2124,7 +2130,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         if (!events.isEmpty()) {
             long startMs = time.milliseconds();
             for (BackgroundEvent event : events) {
-                
kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - 
event.enqueuedMs());
+                
asyncConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - 
event.enqueuedMs());
                 try {
                     if (event instanceof CompletableEvent)
                         backgroundEventReaper.add((CompletableEvent<?>) event);
@@ -2137,7 +2143,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                         log.warn("An error occurred when processing the 
background event: {}", e.getMessage(), e);
                 }
             }
-            
kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds()
 - startMs);
+            
asyncConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds()
 - startMs);
         }
 
         backgroundEventReaper.reap(time.milliseconds());
@@ -2270,10 +2276,14 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     }
 
     @Override
-    public AsyncConsumerMetrics kafkaConsumerMetrics() {
+    public KafkaConsumerMetrics kafkaConsumerMetrics() {
         return kafkaConsumerMetrics;
     }
 
+    AsyncConsumerMetrics asyncConsumerMetrics() {
+        return asyncConsumerMetrics;
+    }
+
     // Visible for testing
     SubscriptionState subscriptions() {
         return subscriptions;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 0e4119b9e33..787d710535e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -256,7 +256,7 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     retryBackoffMs,
                     retryBackoffMaxMs);
 
-            this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, 
CONSUMER_METRIC_GROUP_PREFIX);
+            this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics);
 
             config.logUnused();
             AppInfoParser.registerAppInfo(CONSUMER_JMX_PREFIX, clientId, 
metrics, time.milliseconds());
@@ -296,7 +296,7 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.isolationLevel = ConsumerUtils.configuredIsolationLevel(config);
         this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
         this.assignors = assignors;
-        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, 
CONSUMER_METRIC_GROUP_PREFIX);
+        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics);
         this.interceptors = new 
ConsumerInterceptors<>(Collections.emptyList(), metrics);
         this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
         this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
@@ -361,7 +361,7 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         int maxPollRecords = 
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
         boolean checkCrcs = 
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG);
 
-        ConsumerMetrics metricsRegistry = new 
ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
+        ConsumerMetrics metricsRegistry = new ConsumerMetrics();
         FetchMetricsManager metricsManager = new FetchMetricsManager(metrics, 
metricsRegistry.fetcherMetrics);
         ApiVersions apiVersions = new ApiVersions();
         FetchConfig fetchConfig = new FetchConfig(
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
index 19e9a7e8320..3aa0bbcfbcf 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
@@ -24,6 +24,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+
 public class ConsumerMetrics {
     
     public FetchMetricsRegistry fetcherMetrics;
@@ -32,8 +34,8 @@ public class ConsumerMetrics {
         this.fetcherMetrics = new FetchMetricsRegistry(metricsTags, 
metricGrpPrefix);
     }
 
-    public ConsumerMetrics(String metricGroupPrefix) {
-        this(new HashSet<>(), metricGroupPrefix);
+    public ConsumerMetrics() {
+        this(new HashSet<>(), CONSUMER_METRIC_GROUP_PREFIX);
     }
 
     private List<MetricNameTemplate> getAllTemplates() {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
index e4b0fa924c0..c07a6747559 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
@@ -69,6 +69,7 @@ public final class ConsumerUtils {
     public static final String COORDINATOR_METRICS_SUFFIX = 
"-coordinator-metrics";
     public static final String CONSUMER_METRICS_SUFFIX = "-metrics";
     public static final String CONSUMER_METRIC_GROUP = 
CONSUMER_METRIC_GROUP_PREFIX + CONSUMER_METRICS_SUFFIX;
+    public static final String CONSUMER_SHARE_METRIC_GROUP = 
CONSUMER_SHARE_METRIC_GROUP_PREFIX + CONSUMER_METRICS_SUFFIX;
 
     /**
      * A fixed, large enough value will suffice for max.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 32663249e7a..a3f0d8ee808 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -100,7 +100,7 @@ import java.util.function.Predicate;
 import java.util.function.Supplier;
 
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createShareFetchMetricsManager;
@@ -247,7 +247,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             this.clientTelemetryReporter = 
CommonClientConfigs.telemetryReporter(clientId, config);
             this.clientTelemetryReporter.ifPresent(reporters::add);
             this.metrics = createMetrics(config, time, reporters);
-            this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
+            this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, 
CONSUMER_SHARE_METRIC_GROUP);
 
             this.acknowledgementMode = initializeAcknowledgementMode(config, 
log);
             this.deserializers = new Deserializers<>(config, keyDeserializer, 
valueDeserializer, metrics);
@@ -323,7 +323,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                     new FetchConfig(config),
                     deserializers);
 
-            this.kafkaShareConsumerMetrics = new 
KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX);
+            this.kafkaShareConsumerMetrics = new 
KafkaShareConsumerMetrics(metrics);
 
             config.logUnused();
             AppInfoParser.registerAppInfo(CONSUMER_JMX_PREFIX, clientId, 
metrics, time.milliseconds());
@@ -366,7 +366,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         this.fetchBuffer = new ShareFetchBuffer(logContext);
         this.completedAcknowledgements = new LinkedList<>();
 
-        ShareConsumerMetrics metricsRegistry = new 
ShareConsumerMetrics(CONSUMER_SHARE_METRIC_GROUP_PREFIX);
+        ShareConsumerMetrics metricsRegistry = new ShareConsumerMetrics();
         ShareFetchMetricsManager shareFetchMetricsManager = new 
ShareFetchMetricsManager(metrics, metricsRegistry.shareFetchMetrics);
         this.fetchCollector = new ShareFetchCollector<>(
                 logContext,
@@ -374,8 +374,8 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                 subscriptions,
                 new FetchConfig(config),
                 deserializers);
-        this.kafkaShareConsumerMetrics = new 
KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX);
-        this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
+        this.kafkaShareConsumerMetrics = new 
KafkaShareConsumerMetrics(metrics);
+        this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, 
CONSUMER_SHARE_METRIC_GROUP);
 
         final BlockingQueue<ApplicationEvent> applicationEventQueue = new 
LinkedBlockingQueue<>();
         final BlockingQueue<BackgroundEvent> backgroundEventQueue = new 
LinkedBlockingQueue<>();
@@ -464,10 +464,10 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         this.deserializers = new Deserializers<>(keyDeserializer, 
valueDeserializer, metrics);
         this.currentFetch = ShareFetch.empty();
         this.applicationEventHandler = applicationEventHandler;
-        this.kafkaShareConsumerMetrics = new 
KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX);
+        this.kafkaShareConsumerMetrics = new 
KafkaShareConsumerMetrics(metrics);
         this.clientTelemetryReporter = Optional.empty();
         this.completedAcknowledgements = Collections.emptyList();
-        this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
+        this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, 
CONSUMER_SHARE_METRIC_GROUP);
     }
 
     // auxiliary interface for testing
@@ -1116,19 +1116,23 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
 
         LinkedList<BackgroundEvent> events = new LinkedList<>();
         backgroundEventQueue.drainTo(events);
+        if (!events.isEmpty()) {
+            long startMs = time.milliseconds();
+            for (BackgroundEvent event : events) {
+                
asyncConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - 
event.enqueuedMs());
+                try {
+                    if (event instanceof CompletableEvent)
+                        backgroundEventReaper.add((CompletableEvent<?>) event);
 
-        for (BackgroundEvent event : events) {
-            try {
-                if (event instanceof CompletableEvent)
-                    backgroundEventReaper.add((CompletableEvent<?>) event);
-
-                backgroundEventProcessor.process(event);
-            } catch (Throwable t) {
-                KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
+                    backgroundEventProcessor.process(event);
+                } catch (Throwable t) {
+                    KafkaException e = 
ConsumerUtils.maybeWrapAsKafkaException(t);
 
-                if (!firstError.compareAndSet(null, e))
-                    log.warn("An error occurred when processing the background 
event: {}", e.getMessage(), e);
+                    if (!firstError.compareAndSet(null, e))
+                        log.warn("An error occurred when processing the 
background event: {}", e.getMessage(), e);
+                }
             }
+            
asyncConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds()
 - startMs);
         }
 
         backgroundEventReaper.reap(time.milliseconds());
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetrics.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetrics.java
index 41a41818dee..ee02dfcc17b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetrics.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetrics.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients.consumer.internals;
 import java.util.HashSet;
 import java.util.Set;
 
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
+
 public class ShareConsumerMetrics {
     public ShareFetchMetricsRegistry shareFetchMetrics;
 
@@ -26,7 +28,7 @@ public class ShareConsumerMetrics {
         this.shareFetchMetrics = new ShareFetchMetricsRegistry(metricsTags, 
metricGrpPrefix);
     }
 
-    public ShareConsumerMetrics(String metricGroupPrefix) {
-        this(new HashSet<>(), metricGroupPrefix);
+    public ShareConsumerMetrics() {
+        this(new HashSet<>(), CONSUMER_SHARE_METRIC_GROUP_PREFIX);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
index 09e84cbe985..2f90440a662 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
@@ -24,10 +24,7 @@ import org.apache.kafka.common.metrics.stats.Value;
 
 import java.util.Arrays;
 
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
-
-public class AsyncConsumerMetrics extends KafkaConsumerMetrics implements 
AutoCloseable {
+public class AsyncConsumerMetrics implements AutoCloseable {
     private final Metrics metrics;
 
     public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME = 
"time-between-network-thread-poll";
@@ -51,15 +48,13 @@ public class AsyncConsumerMetrics extends 
KafkaConsumerMetrics implements AutoCl
     private final Sensor unsentRequestsQueueSizeSensor;
     private final Sensor unsentRequestsQueueTimeSensor;
 
-    public AsyncConsumerMetrics(Metrics metrics) {
-        super(metrics, CONSUMER_METRIC_GROUP_PREFIX);
-
+    public AsyncConsumerMetrics(Metrics metrics, String groupName) {
         this.metrics = metrics;
         this.timeBetweenNetworkThreadPollSensor = 
metrics.sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME);
         this.timeBetweenNetworkThreadPollSensor.add(
             metrics.metricName(
                 "time-between-network-thread-poll-avg",
-                CONSUMER_METRIC_GROUP,
+                groupName,
                 "The average time taken, in milliseconds, between each poll in 
the network thread."
             ),
             new Avg()
@@ -67,7 +62,7 @@ public class AsyncConsumerMetrics extends 
KafkaConsumerMetrics implements AutoCl
         this.timeBetweenNetworkThreadPollSensor.add(
             metrics.metricName(
                 "time-between-network-thread-poll-max",
-                CONSUMER_METRIC_GROUP,
+                groupName,
                 "The maximum time taken, in milliseconds, between each poll in 
the network thread."
             ),
             new Max()
@@ -77,7 +72,7 @@ public class AsyncConsumerMetrics extends 
KafkaConsumerMetrics implements AutoCl
         this.applicationEventQueueSizeSensor.add(
             metrics.metricName(
                 APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME,
-                CONSUMER_METRIC_GROUP,
+                groupName,
                 "The current number of events in the queue to send from the 
application thread to the background thread."
             ),
             new Value()
@@ -87,7 +82,7 @@ public class AsyncConsumerMetrics extends 
KafkaConsumerMetrics implements AutoCl
         this.applicationEventQueueTimeSensor.add(
             metrics.metricName(
                 "application-event-queue-time-avg",
-                CONSUMER_METRIC_GROUP,
+                groupName,
                 "The average time, in milliseconds, that application events 
are taking to be dequeued."
             ),
             new Avg()
@@ -95,7 +90,7 @@ public class AsyncConsumerMetrics extends 
KafkaConsumerMetrics implements AutoCl
         this.applicationEventQueueTimeSensor.add(
             metrics.metricName(
                 "application-event-queue-time-max",
-                CONSUMER_METRIC_GROUP,
+                groupName,
                 "The maximum time, in milliseconds, that an application event 
took to be dequeued."
             ),
             new Max()
@@ -105,14 +100,14 @@ public class AsyncConsumerMetrics extends 
KafkaConsumerMetrics implements AutoCl
         this.applicationEventQueueProcessingTimeSensor.add(
             metrics.metricName(
                 "application-event-queue-processing-time-avg",
-                CONSUMER_METRIC_GROUP,
+                groupName,
                 "The average time, in milliseconds, that the background thread 
takes to process all available application events."
             ),
             new Avg()
         );
         this.applicationEventQueueProcessingTimeSensor.add(
             metrics.metricName("application-event-queue-processing-time-max",
-                CONSUMER_METRIC_GROUP,
+                groupName,
                 "The maximum time, in milliseconds, that the background thread 
took to process all available application events."
             ),
             new Max()
@@ -122,7 +117,7 @@ public class AsyncConsumerMetrics extends 
KafkaConsumerMetrics implements AutoCl
         this.applicationEventExpiredSizeSensor.add(
             metrics.metricName(
                 APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME,
-                CONSUMER_METRIC_GROUP,
+                groupName,
                 "The current number of expired application events."
             ),
             new Value()
@@ -132,7 +127,7 @@ public class AsyncConsumerMetrics extends 
KafkaConsumerMetrics implements AutoCl
         this.unsentRequestsQueueSizeSensor.add(
             metrics.metricName(
                 UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME,
-                CONSUMER_METRIC_GROUP,
+                groupName,
                 "The current number of unsent requests in the background 
thread."
             ),
             new Value()
@@ -142,7 +137,7 @@ public class AsyncConsumerMetrics extends 
KafkaConsumerMetrics implements AutoCl
         this.unsentRequestsQueueTimeSensor.add(
             metrics.metricName(
                 "unsent-requests-queue-time-avg",
-                CONSUMER_METRIC_GROUP,
+                groupName,
                 "The average time, in milliseconds, that requests are taking 
to be sent in the background thread."
             ),
             new Avg()
@@ -150,7 +145,7 @@ public class AsyncConsumerMetrics extends 
KafkaConsumerMetrics implements AutoCl
         this.unsentRequestsQueueTimeSensor.add(
             metrics.metricName(
                 "unsent-requests-queue-time-max",
-                CONSUMER_METRIC_GROUP,
+                groupName,
                 "The maximum time, in milliseconds, that a request remained 
unsent in the background thread."
             ),
             new Max()
@@ -160,7 +155,7 @@ public class AsyncConsumerMetrics extends 
KafkaConsumerMetrics implements AutoCl
         this.backgroundEventQueueSizeSensor.add(
             metrics.metricName(
                 BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME,
-                CONSUMER_METRIC_GROUP,
+                groupName,
                 "The current number of events in the queue to send from the 
background thread to the application thread."
             ),
             new Value()
@@ -170,7 +165,7 @@ public class AsyncConsumerMetrics extends 
KafkaConsumerMetrics implements AutoCl
         this.backgroundEventQueueTimeSensor.add(
             metrics.metricName(
                 "background-event-queue-time-avg",
-                CONSUMER_METRIC_GROUP,
+                groupName,
                 "The average time, in milliseconds, that background events are 
taking to be dequeued."
             ),
             new Avg()
@@ -178,7 +173,7 @@ public class AsyncConsumerMetrics extends 
KafkaConsumerMetrics implements AutoCl
         this.backgroundEventQueueTimeSensor.add(
             metrics.metricName(
                 "background-event-queue-time-max",
-                CONSUMER_METRIC_GROUP,
+                groupName,
                 "The maximum time, in milliseconds, that background events are 
taking to be dequeued."
             ),
             new Max()
@@ -188,7 +183,7 @@ public class AsyncConsumerMetrics extends 
KafkaConsumerMetrics implements AutoCl
         this.backgroundEventQueueProcessingTimeSensor.add(
             metrics.metricName(
                 "background-event-queue-processing-time-avg",
-                CONSUMER_METRIC_GROUP,
+                groupName,
                 "The average time, in milliseconds, that the consumer took to 
process all available background events."
             ),
             new Avg()
@@ -196,7 +191,7 @@ public class AsyncConsumerMetrics extends 
KafkaConsumerMetrics implements AutoCl
         this.backgroundEventQueueProcessingTimeSensor.add(
             metrics.metricName(
                 "background-event-queue-processing-time-max",
-                CONSUMER_METRIC_GROUP,
+                groupName,
                 "The maximum time, in milliseconds, that the consumer took to 
process all available background events."
             ),
             new Max()
@@ -257,6 +252,5 @@ public class AsyncConsumerMetrics extends 
KafkaConsumerMetrics implements AutoCl
             unsentRequestsQueueSizeSensor.name(),
             unsentRequestsQueueTimeSensor.name()
         ).forEach(metrics::removeSensor);
-        super.close();
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java
index 52502e714a9..1b2bb4518f9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.metrics.stats.Max;
 
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
 
 public class KafkaConsumerMetrics implements AutoCloseable {
     private final Metrics metrics;
@@ -39,9 +39,9 @@ public class KafkaConsumerMetrics implements AutoCloseable {
     private long pollStartMs;
     private long timeSinceLastPollMs;
 
-    public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
+    public KafkaConsumerMetrics(Metrics metrics) {
         this.metrics = metrics;
-        final String metricGroupName = metricGrpPrefix + 
CONSUMER_METRICS_SUFFIX;
+        final String metricGroupName = CONSUMER_METRIC_GROUP;
         Measurable lastPoll = (mConfig, now) -> {
             if (lastPollMs == 0L)
                 // if no poll is ever triggered, just return -1.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java
index b7da8245aaa..e154b97da5a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.metrics.stats.Max;
 
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP;
 
 public class KafkaShareConsumerMetrics implements AutoCloseable {
     private final Metrics metrics;
@@ -36,9 +36,9 @@ public class KafkaShareConsumerMetrics implements 
AutoCloseable {
     private long pollStartMs;
     private long timeSinceLastPollMs;
 
-    public KafkaShareConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
+    public KafkaShareConsumerMetrics(Metrics metrics) {
         this.metrics = metrics;
-        final String metricGroupName = metricGrpPrefix + 
CONSUMER_METRICS_SUFFIX;
+        final String metricGroupName = CONSUMER_SHARE_METRIC_GROUP;
         Measurable lastPoll = (mConfig, now) -> {
             if (lastPollMs == 0L)
                 // if no poll is ever triggered, just return -1.
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
index 3430719b16e..402697227ee 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
@@ -27,7 +27,8 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -44,10 +45,11 @@ public class ApplicationEventHandlerTest {
     private final RequestManagers requestManagers = 
mock(RequestManagers.class);
     private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
 
-    @Test
-    public void testRecordApplicationEventQueueSize() {
+    @ParameterizedTest
+    
@MethodSource("org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetricsTest#groupNameProvider")
+    public void testRecordApplicationEventQueueSize(String groupName) {
         try (Metrics metrics = new Metrics();
-             AsyncConsumerMetrics asyncConsumerMetrics = spy(new 
AsyncConsumerMetrics(metrics));
+             AsyncConsumerMetrics asyncConsumerMetrics = spy(new 
AsyncConsumerMetrics(metrics, groupName));
              ApplicationEventHandler applicationEventHandler = new 
ApplicationEventHandler(
                      new LogContext(),
                      time,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index c03d1553ac5..6a5cb871fd1 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -2015,12 +2015,12 @@ public class AsyncKafkaConsumerTest {
                 mock(ConsumerRebalanceListenerInvoker.class),
                 mock(SubscriptionState.class));
         Metrics metrics = consumer.metricsRegistry();
-        AsyncConsumerMetrics kafkaConsumerMetrics = 
consumer.kafkaConsumerMetrics();
+        AsyncConsumerMetrics asyncConsumerMetrics = 
consumer.asyncConsumerMetrics();
 
         ConsumerRebalanceListenerCallbackNeededEvent event = new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, 
Collections.emptySortedSet());
         event.setEnqueuedMs(time.milliseconds());
         backgroundEventQueue.add(event);
-        kafkaConsumerMetrics.recordBackgroundEventQueueSize(1);
+        asyncConsumerMetrics.recordBackgroundEventQueueSize(1);
 
         time.sleep(10);
         consumer.processBackgroundEvents();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java
index 63269b6f554..7a999e51163 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java
@@ -23,22 +23,23 @@ import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
 import static 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics.BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class BackgroundEventHandlerTest {
     private final BlockingQueue<BackgroundEvent> backgroundEventsQueue =  new 
LinkedBlockingQueue<>();
 
-    @Test
-    public void testRecordBackgroundEventQueueSize() {
+    @ParameterizedTest
+    
@MethodSource("org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetricsTest#groupNameProvider")
+    public void testRecordBackgroundEventQueueSize(String groupName) {
         try (Metrics metrics = new Metrics();
-             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics)) {
+             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics, groupName)) {
             BackgroundEventHandler backgroundEventHandler = new 
BackgroundEventHandler(
                 backgroundEventsQueue,
                 new MockTime(0),
@@ -48,7 +49,7 @@ public class BackgroundEventHandlerTest {
             assertEquals(
                 1,
                 (double) metrics.metric(
-                    
metrics.metricName(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME, 
CONSUMER_METRIC_GROUP)
+                    
metrics.metricName(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME, groupName)
                 ).metricValue()
             );
 
@@ -57,7 +58,7 @@ public class BackgroundEventHandlerTest {
             assertEquals(
                 0,
                 (double) metrics.metric(
-                    
metrics.metricName(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME, 
CONSUMER_METRIC_GROUP)
+                    
metrics.metricName(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME, groupName)
                 ).metricValue()
             );
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
index 59b0a346a23..35ccb17dfab 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
@@ -42,7 +43,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Supplier;
 
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
 import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -206,10 +206,11 @@ public class ConsumerNetworkThreadTest {
         verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong());
     }
 
-    @Test
-    public void testRunOnceRecordTimeBetweenNetworkThreadPoll() {
+    @ParameterizedTest
+    
@MethodSource("org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetricsTest#groupNameProvider")
+    public void testRunOnceRecordTimeBetweenNetworkThreadPoll(String 
groupName) {
         try (Metrics metrics = new Metrics();
-             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics);
+             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics, groupName);
              ConsumerNetworkThread consumerNetworkThread = new 
ConsumerNetworkThread(
                      new LogContext(),
                      time,
@@ -228,22 +229,23 @@ public class ConsumerNetworkThreadTest {
             assertEquals(
                 10,
                 (double) metrics.metric(
-                    metrics.metricName("time-between-network-thread-poll-avg", 
CONSUMER_METRIC_GROUP)
+                    metrics.metricName("time-between-network-thread-poll-avg", 
groupName)
                 ).metricValue()
             );
             assertEquals(
                 10,
                 (double) metrics.metric(
-                    metrics.metricName("time-between-network-thread-poll-max", 
CONSUMER_METRIC_GROUP)
+                    metrics.metricName("time-between-network-thread-poll-max", 
groupName)
                 ).metricValue()
             );
         }
     }
 
-    @Test
-    public void 
testRunOnceRecordApplicationEventQueueSizeAndApplicationEventQueueTime() {
+    @ParameterizedTest
+    
@MethodSource("org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetricsTest#groupNameProvider")
+    public void 
testRunOnceRecordApplicationEventQueueSizeAndApplicationEventQueueTime(String 
groupName) {
         try (Metrics metrics = new Metrics();
-             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics);
+             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics, groupName);
              ConsumerNetworkThread consumerNetworkThread = new 
ConsumerNetworkThread(
                      new LogContext(),
                      time,
@@ -266,19 +268,19 @@ public class ConsumerNetworkThreadTest {
             assertEquals(
                 0,
                 (double) metrics.metric(
-                    metrics.metricName("application-event-queue-size", 
CONSUMER_METRIC_GROUP)
+                    metrics.metricName("application-event-queue-size", 
groupName)
                 ).metricValue()
             );
             assertEquals(
                 10,
                 (double) metrics.metric(
-                    metrics.metricName("application-event-queue-time-avg", 
CONSUMER_METRIC_GROUP)
+                    metrics.metricName("application-event-queue-time-avg", 
groupName)
                 ).metricValue()
             );
             assertEquals(
                 10,
                 (double) metrics.metric(
-                    metrics.metricName("application-event-queue-time-max", 
CONSUMER_METRIC_GROUP)
+                    metrics.metricName("application-event-queue-time-max", 
groupName)
                 ).metricValue()
             );
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
index c75ee906e53..22aa33098ee 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
@@ -18,23 +18,27 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import 
org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.Set;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 class KafkaConsumerMetricsTest {
     private static final long METRIC_VALUE = 123L;
-    private static final String CONSUMER_GROUP_PREFIX = "consumer";
     private static final String CONSUMER_METRIC_GROUP = "consumer-metrics";
     private static final String COMMIT_SYNC_TIME_TOTAL = 
"commit-sync-time-ns-total";
     private static final String COMMITTED_TIME_TOTAL = 
"committed-time-ns-total";
 
     private final Metrics metrics = new Metrics();
     private final KafkaConsumerMetrics consumerMetrics
-        = new KafkaConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX);
+        = new KafkaConsumerMetrics(metrics);
 
     @Test
     public void shouldRecordCommitSyncTime() {
@@ -64,6 +68,31 @@ class KafkaConsumerMetricsTest {
         assertMetricRemoved(COMMITTED_TIME_TOTAL);
     }
 
+    @Test
+    public void checkMetricsAfterCreation() {
+        Set<MetricName> expectedMetrics = Set.of(
+            metrics.metricName("last-poll-seconds-ago", CONSUMER_METRIC_GROUP),
+            metrics.metricName("time-between-poll-avg", CONSUMER_METRIC_GROUP),
+            metrics.metricName("time-between-poll-max", CONSUMER_METRIC_GROUP),
+            metrics.metricName("poll-idle-ratio-avg", CONSUMER_METRIC_GROUP),
+            metrics.metricName("commit-sync-time-ns-total", 
CONSUMER_METRIC_GROUP),
+            metrics.metricName("committed-time-ns-total", 
CONSUMER_METRIC_GROUP)
+        );
+        expectedMetrics.forEach(
+            metricName -> assertTrue(
+                metrics.metrics().containsKey(metricName),
+                "Missing metric: " + metricName
+            )
+        );
+        consumerMetrics.close();
+        expectedMetrics.forEach(
+                metricName -> assertFalse(
+                        metrics.metrics().containsKey(metricName),
+                        "Metric present after close: " + metricName
+                )
+        );
+    }
+
     private void assertMetricRemoved(final String name) {
         assertNull(metrics.metric(metrics.metricName(name, 
CONSUMER_METRIC_GROUP)));
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
index 4ff967e1f02..24f4aea1c7a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
@@ -40,6 +40,8 @@ import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -53,7 +55,6 @@ import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -246,10 +247,11 @@ public class NetworkClientDelegateTest {
         assertEquals(authException, ((ErrorEvent) event).error());
     }
 
-    @Test
-    public void testRecordUnsentRequestsQueueTime() throws Exception {
+    @ParameterizedTest
+    
@MethodSource("org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetricsTest#groupNameProvider")
+    public void testRecordUnsentRequestsQueueTime(String groupName) throws 
Exception {
         try (Metrics metrics = new Metrics();
-             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics);
+             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics, groupName);
              NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate(false, asyncConsumerMetrics)) {
             NetworkClientDelegate.UnsentRequest unsentRequest = 
newUnsentFindCoordinatorRequest();
             networkClientDelegate.add(unsentRequest);
@@ -261,19 +263,19 @@ public class NetworkClientDelegateTest {
             assertEquals(
                 0,
                 (double) metrics.metric(
-                    metrics.metricName("unsent-requests-queue-size", 
CONSUMER_METRIC_GROUP)
+                    metrics.metricName("unsent-requests-queue-size", groupName)
                 ).metricValue()
             );
             assertEquals(
                 10,
                 (double) metrics.metric(
-                    metrics.metricName("unsent-requests-queue-time-avg", 
CONSUMER_METRIC_GROUP)
+                    metrics.metricName("unsent-requests-queue-time-avg", 
groupName)
                 ).metricValue()
             );
             assertEquals(
                 10,
                 (double) metrics.metric(
-                    metrics.metricName("unsent-requests-queue-time-max", 
CONSUMER_METRIC_GROUP)
+                    metrics.metricName("unsent-requests-queue-time-max", 
groupName)
                 ).metricValue()
             );
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
index 27315068e10..d6dd8c30caa 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
@@ -20,11 +20,14 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Set;
+import java.util.stream.Stream;
 
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -35,6 +38,13 @@ public class AsyncConsumerMetricsTest {
     private final Metrics metrics = new Metrics();
     private AsyncConsumerMetrics consumerMetrics;
 
+    public static Stream<String> groupNameProvider() {
+        return Stream.of(
+            CONSUMER_METRIC_GROUP,
+            CONSUMER_SHARE_METRIC_GROUP
+        );
+    }
+
     @AfterEach
     public void tearDown() {
         if (consumerMetrics != null) {
@@ -43,17 +53,27 @@ public class AsyncConsumerMetricsTest {
         metrics.close();
     }
 
-    @Test
-    public void shouldMetricNames() {
+    @ParameterizedTest
+    @MethodSource("groupNameProvider")
+    public void shouldMetricNames(String groupName) {
         // create
-        consumerMetrics = new AsyncConsumerMetrics(metrics);
+        consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
         Set<MetricName> expectedMetrics = Set.of(
-            metrics.metricName("last-poll-seconds-ago", CONSUMER_METRIC_GROUP),
-            metrics.metricName("time-between-poll-avg", CONSUMER_METRIC_GROUP),
-            metrics.metricName("time-between-poll-max", CONSUMER_METRIC_GROUP),
-            metrics.metricName("poll-idle-ratio-avg", CONSUMER_METRIC_GROUP),
-            metrics.metricName("commit-sync-time-ns-total", 
CONSUMER_METRIC_GROUP),
-            metrics.metricName("committed-time-ns-total", 
CONSUMER_METRIC_GROUP)
+            metrics.metricName("time-between-network-thread-poll-avg", 
groupName),
+            metrics.metricName("time-between-network-thread-poll-max", 
groupName),
+            metrics.metricName("application-event-queue-size", groupName),
+            metrics.metricName("application-event-queue-time-avg", groupName),
+            metrics.metricName("application-event-queue-time-max", groupName),
+            metrics.metricName("application-event-queue-processing-time-avg", 
groupName),
+            metrics.metricName("application-event-queue-processing-time-max", 
groupName),
+            metrics.metricName("unsent-requests-queue-size", groupName),
+            metrics.metricName("unsent-requests-queue-time-avg", groupName),
+            metrics.metricName("unsent-requests-queue-time-max", groupName),
+            metrics.metricName("background-event-queue-size", groupName),
+            metrics.metricName("background-event-queue-time-avg", groupName),
+            metrics.metricName("background-event-queue-time-max", groupName),
+            metrics.metricName("background-event-queue-processing-time-avg", 
groupName),
+            metrics.metricName("background-event-queue-processing-time-max", 
groupName)
         );
         expectedMetrics.forEach(
             metricName -> assertTrue(
@@ -62,30 +82,6 @@ public class AsyncConsumerMetricsTest {
             )
         );
 
-        Set<MetricName> expectedConsumerMetrics = Set.of(
-            metrics.metricName("time-between-network-thread-poll-avg", 
CONSUMER_METRIC_GROUP),
-            metrics.metricName("time-between-network-thread-poll-max", 
CONSUMER_METRIC_GROUP),
-            metrics.metricName("application-event-queue-size", 
CONSUMER_METRIC_GROUP),
-            metrics.metricName("application-event-queue-time-avg", 
CONSUMER_METRIC_GROUP),
-            metrics.metricName("application-event-queue-time-max", 
CONSUMER_METRIC_GROUP),
-            metrics.metricName("application-event-queue-processing-time-avg", 
CONSUMER_METRIC_GROUP),
-            metrics.metricName("application-event-queue-processing-time-max", 
CONSUMER_METRIC_GROUP),
-            metrics.metricName("unsent-requests-queue-size", 
CONSUMER_METRIC_GROUP),
-            metrics.metricName("unsent-requests-queue-time-avg", 
CONSUMER_METRIC_GROUP),
-            metrics.metricName("unsent-requests-queue-time-max", 
CONSUMER_METRIC_GROUP),
-            metrics.metricName("background-event-queue-size", 
CONSUMER_METRIC_GROUP),
-            metrics.metricName("background-event-queue-time-avg", 
CONSUMER_METRIC_GROUP),
-            metrics.metricName("background-event-queue-time-max", 
CONSUMER_METRIC_GROUP),
-            metrics.metricName("background-event-queue-processing-time-avg", 
CONSUMER_METRIC_GROUP),
-            metrics.metricName("background-event-queue-processing-time-max", 
CONSUMER_METRIC_GROUP)
-        );
-        expectedConsumerMetrics.forEach(
-            metricName -> assertTrue(
-                metrics.metrics().containsKey(metricName),
-                "Missing metric: " + metricName
-            )
-        );
-
         // close
         consumerMetrics.close();
         expectedMetrics.forEach(
@@ -94,28 +90,24 @@ public class AsyncConsumerMetricsTest {
                 "Metric present after close: " + metricName
             )
         );
-        expectedConsumerMetrics.forEach(
-            metricName -> assertFalse(
-                metrics.metrics().containsKey(metricName),
-                "Metric present after close: " + metricName
-            )
-        );
     }
 
-    @Test
-    public void shouldRecordTimeBetweenNetworkThreadPoll() {
-        consumerMetrics = new AsyncConsumerMetrics(metrics);
+    @ParameterizedTest
+    @MethodSource("groupNameProvider")
+    public void shouldRecordTimeBetweenNetworkThreadPoll(String groupName) {
+        consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
         // When:
         consumerMetrics.recordTimeBetweenNetworkThreadPoll(METRIC_VALUE);
 
         // Then:
-        assertMetricValue("time-between-network-thread-poll-avg");
-        assertMetricValue("time-between-network-thread-poll-max");
+        assertMetricValue("time-between-network-thread-poll-avg", groupName);
+        assertMetricValue("time-between-network-thread-poll-max", groupName);
     }
 
-    @Test
-    public void shouldRecordApplicationEventQueueSize() {
-        consumerMetrics = new AsyncConsumerMetrics(metrics);
+    @ParameterizedTest
+    @MethodSource("groupNameProvider")
+    public void shouldRecordApplicationEventQueueSize(String groupName) {
+        consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
         // When:
         consumerMetrics.recordApplicationEventQueueSize(10);
 
@@ -124,38 +116,41 @@ public class AsyncConsumerMetricsTest {
             metrics.metric(
                 metrics.metricName(
                     "application-event-queue-size",
-                    CONSUMER_METRIC_GROUP
+                    groupName
                 )
             ).metricValue(),
             (double) 10
         );
     }
 
-    @Test
-    public void shouldRecordApplicationEventQueueTime() {
-        consumerMetrics = new AsyncConsumerMetrics(metrics);
+    @ParameterizedTest
+    @MethodSource("groupNameProvider")
+    public void shouldRecordApplicationEventQueueTime(String groupName) {
+        consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
         // When:
         consumerMetrics.recordApplicationEventQueueTime(METRIC_VALUE);
 
         // Then:
-        assertMetricValue("application-event-queue-time-avg");
-        assertMetricValue("application-event-queue-time-max");
+        assertMetricValue("application-event-queue-time-avg", groupName);
+        assertMetricValue("application-event-queue-time-max", groupName);
     }
 
-    @Test
-    public void shouldRecordApplicationEventQueueProcessingTime() {
-        consumerMetrics = new AsyncConsumerMetrics(metrics);
+    @ParameterizedTest
+    @MethodSource("groupNameProvider")
+    public void shouldRecordApplicationEventQueueProcessingTime(String 
groupName) {
+        consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
         // When:
         
consumerMetrics.recordApplicationEventQueueProcessingTime(METRIC_VALUE);
 
         // Then:
-        assertMetricValue("application-event-queue-processing-time-avg");
-        assertMetricValue("application-event-queue-processing-time-max");
+        assertMetricValue("application-event-queue-processing-time-avg", 
groupName);
+        assertMetricValue("application-event-queue-processing-time-max", 
groupName);
     }
 
-    @Test
-    public void shouldRecordUnsentRequestsQueueSize() {
-        consumerMetrics = new AsyncConsumerMetrics(metrics);
+    @ParameterizedTest
+    @MethodSource("groupNameProvider")
+    public void shouldRecordUnsentRequestsQueueSize(String groupName) {
+        consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
         // When:
         consumerMetrics.recordUnsentRequestsQueueSize(10, 100);
 
@@ -164,27 +159,29 @@ public class AsyncConsumerMetricsTest {
             metrics.metric(
                 metrics.metricName(
                     "unsent-requests-queue-size",
-                    CONSUMER_METRIC_GROUP
+                    groupName
                 )
             ).metricValue(),
             (double) 10
         );
     }
 
-    @Test
-    public void shouldRecordUnsentRequestsQueueTime() {
-        consumerMetrics = new AsyncConsumerMetrics(metrics);
+    @ParameterizedTest
+    @MethodSource("groupNameProvider")
+    public void shouldRecordUnsentRequestsQueueTime(String groupName) {
+        consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
         // When:
         consumerMetrics.recordUnsentRequestsQueueTime(METRIC_VALUE);
 
         // Then:
-        assertMetricValue("unsent-requests-queue-time-avg");
-        assertMetricValue("unsent-requests-queue-time-max");
+        assertMetricValue("unsent-requests-queue-time-avg", groupName);
+        assertMetricValue("unsent-requests-queue-time-max", groupName);
     }
 
-    @Test
-    public void shouldRecordBackgroundEventQueueSize() {
-        consumerMetrics = new AsyncConsumerMetrics(metrics);
+    @ParameterizedTest
+    @MethodSource("groupNameProvider")
+    public void shouldRecordBackgroundEventQueueSize(String groupName) {
+        consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
         // When:
         consumerMetrics.recordBackgroundEventQueueSize(10);
 
@@ -193,41 +190,43 @@ public class AsyncConsumerMetricsTest {
             metrics.metric(
                 metrics.metricName(
                     "background-event-queue-size",
-                    CONSUMER_METRIC_GROUP
+                    groupName
                 )
             ).metricValue(),
             (double) 10
         );
     }
 
-    @Test
-    public void shouldRecordBackgroundEventQueueTime() {
-        consumerMetrics = new AsyncConsumerMetrics(metrics);
+    @ParameterizedTest
+    @MethodSource("groupNameProvider")
+    public void shouldRecordBackgroundEventQueueTime(String groupName) {
+        consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
         // When:
         consumerMetrics.recordBackgroundEventQueueTime(METRIC_VALUE);
 
         // Then:
-        assertMetricValue("background-event-queue-time-avg");
-        assertMetricValue("background-event-queue-time-max");
+        assertMetricValue("background-event-queue-time-avg", groupName);
+        assertMetricValue("background-event-queue-time-max", groupName);
     }
 
-    @Test
-    public void shouldRecordBackgroundEventQueueProcessingTime() {
-        consumerMetrics = new AsyncConsumerMetrics(metrics);
+    @ParameterizedTest
+    @MethodSource("groupNameProvider")
+    public void shouldRecordBackgroundEventQueueProcessingTime(String 
groupName) {
+        consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
         // When:
         consumerMetrics.recordBackgroundEventQueueProcessingTime(METRIC_VALUE);
 
         // Then:
-        assertMetricValue("background-event-queue-processing-time-avg");
-        assertMetricValue("background-event-queue-processing-time-avg");
+        assertMetricValue("background-event-queue-processing-time-avg", 
groupName);
+        assertMetricValue("background-event-queue-processing-time-max", 
groupName);
     }
 
-    private void assertMetricValue(final String name) {
+    private void assertMetricValue(final String name, final String groupName) {
         assertEquals(
             metrics.metric(
                 metrics.metricName(
                     name,
-                    CONSUMER_METRIC_GROUP
+                    groupName
                 )
             ).metricValue(),
             (double) METRIC_VALUE

Reply via email to