This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d226b435972 KAFKA-18220: Refactor AsyncConsumerMetrics to not extend
KafkaConsumerMetrics (#20283)
d226b435972 is described below
commit d226b435972c2b0bac99668c17fa8942a1d431bf
Author: Shivsundar R <[email protected]>
AuthorDate: Wed Sep 3 07:35:55 2025 -0400
KAFKA-18220: Refactor AsyncConsumerMetrics to not extend
KafkaConsumerMetrics (#20283)
*What*
https://issues.apache.org/jira/browse/KAFKA-18220
- Currently, `AsyncConsumerMetrics` extends `KafkaConsumerMetrics`, but
is being used both by `AsyncKafkaConsumer` and `ShareConsumerImpl`.
- `ShareConsumerImpl` only needs the async consumer metrics(the metrics
associated with the new consumer threading model).
- This needs to be fixed, we are unnecessarily having
`KafkaConsumerMetrics` as a parent class for `ShareConsumer` metrics.
Fix :
- In this PR, we have removed the dependancy of `AsyncConsumerMetrics`
on `KafkaConsumerMetrics` and made it an independent class which both
`AsyncKafkaConsumer` and `ShareConsumerImpl` will use.
- The "`asyncConsumerMetrics`" field represents the metrics associated
with the new consumer threading model (like application event queue
size, background queue size, etc).
- The "`kafkaConsumerMetrics`" and "`kafkaShareConsumerMetrics`" fields
denote the actual consumer metrics for `KafkaConsumer` and
`KafkaShareConsumer` respectively.
Reviewers: Andrew Schofield <[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 42 ++++--
.../consumer/internals/ClassicKafkaConsumer.java | 6 +-
.../consumer/internals/ConsumerMetrics.java | 6 +-
.../clients/consumer/internals/ConsumerUtils.java | 1 +
.../consumer/internals/ShareConsumerImpl.java | 40 ++---
.../consumer/internals/ShareConsumerMetrics.java | 6 +-
.../internals/metrics/AsyncConsumerMetrics.java | 42 +++---
.../internals/metrics/KafkaConsumerMetrics.java | 6 +-
.../metrics/KafkaShareConsumerMetrics.java | 6 +-
.../internals/ApplicationEventHandlerTest.java | 10 +-
.../consumer/internals/AsyncKafkaConsumerTest.java | 4 +-
.../internals/BackgroundEventHandlerTest.java | 15 +-
.../internals/ConsumerNetworkThreadTest.java | 26 ++--
.../internals/KafkaConsumerMetricsTest.java | 33 +++-
.../internals/NetworkClientDelegateTest.java | 16 +-
.../metrics/AsyncConsumerMetricsTest.java | 167 ++++++++++-----------
16 files changed, 237 insertions(+), 189 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 1ca42dbc75c..5512c962606 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -78,6 +78,7 @@ import
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChang
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import
org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import
org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
@@ -143,7 +144,7 @@ import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
import static
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.TOPIC_PARTITION_COMPARATOR;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredConsumerInterceptors;
@@ -323,7 +324,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private final ApplicationEventHandler applicationEventHandler;
private final Time time;
private final AtomicReference<Optional<ConsumerGroupMetadata>>
groupMetadata = new AtomicReference<>(Optional.empty());
- private final AsyncConsumerMetrics kafkaConsumerMetrics;
+ private final AsyncConsumerMetrics asyncConsumerMetrics;
+ private final KafkaConsumerMetrics kafkaConsumerMetrics;
private Logger log;
private final String clientId;
private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
@@ -434,7 +436,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.clientTelemetryReporter =
CommonClientConfigs.telemetryReporter(clientId, config);
this.clientTelemetryReporter.ifPresent(reporters::add);
this.metrics = createMetrics(config, time, reporters);
- this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
+ this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics,
CONSUMER_METRIC_GROUP);
+ this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics);
this.retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.requestTimeoutMs =
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
@@ -458,7 +461,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue,
time,
- kafkaConsumerMetrics
+ asyncConsumerMetrics
);
// This FetchBuffer is shared between the application and network
threads.
@@ -473,7 +476,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
backgroundEventHandler,
false,
- kafkaConsumerMetrics
+ asyncConsumerMetrics
);
this.offsetCommitCallbackInvoker = new
OffsetCommitCallbackInvoker(interceptors);
this.groupMetadata.set(initializeGroupMetadata(config,
groupRebalanceConfig));
@@ -506,7 +509,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
requestManagersSupplier,
- kafkaConsumerMetrics
+ asyncConsumerMetrics
);
this.rebalanceListenerInvoker = new
ConsumerRebalanceListenerInvoker(
logContext,
@@ -584,14 +587,15 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.defaultApiTimeoutMs = Duration.ofMillis(defaultApiTimeoutMs);
this.deserializers = deserializers;
this.applicationEventHandler = applicationEventHandler;
- this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
+ this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics);
+ this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics,
CONSUMER_METRIC_GROUP);
this.clientTelemetryReporter = Optional.empty();
this.autoCommitEnabled = autoCommitEnabled;
this.offsetCommitCallbackInvoker = new
OffsetCommitCallbackInvoker(interceptors);
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue,
time,
- kafkaConsumerMetrics
+ asyncConsumerMetrics
);
}
@@ -619,7 +623,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.deserializers = new Deserializers<>(keyDeserializer,
valueDeserializer, metrics);
this.clientTelemetryReporter = Optional.empty();
- ConsumerMetrics metricsRegistry = new
ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
+ ConsumerMetrics metricsRegistry = new ConsumerMetrics();
FetchMetricsManager fetchMetricsManager = new
FetchMetricsManager(metrics, metricsRegistry.fetcherMetrics);
this.fetchCollector = new FetchCollector<>(logContext,
metadata,
@@ -628,7 +632,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
deserializers,
fetchMetricsManager,
time);
- this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
+ this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics,
CONSUMER_METRIC_GROUP);
+ this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics);
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
config,
@@ -642,7 +647,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue,
time,
- kafkaConsumerMetrics
+ asyncConsumerMetrics
);
this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
logContext,
@@ -659,7 +664,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
metadata,
backgroundEventHandler,
false,
- kafkaConsumerMetrics
+ asyncConsumerMetrics
);
this.offsetCommitCallbackInvoker = new
OffsetCommitCallbackInvoker(interceptors);
Supplier<RequestManagers> requestManagersSupplier =
RequestManagers.supplier(
@@ -693,7 +698,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
requestManagersSupplier,
- kafkaConsumerMetrics);
+ asyncConsumerMetrics);
this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper = new CompletableEventReaper(logContext);
}
@@ -1489,6 +1494,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
closeQuietly(interceptors, "consumer interceptors", firstException);
closeQuietly(kafkaConsumerMetrics, "kafka consumer metrics",
firstException);
+ closeQuietly(asyncConsumerMetrics, "async consumer metrics",
firstException);
closeQuietly(metrics, "consumer metrics", firstException);
closeQuietly(deserializers, "consumer deserializers", firstException);
clientTelemetryReporter.ifPresent(reporter -> closeQuietly(reporter,
"async consumer telemetry reporter", firstException));
@@ -2124,7 +2130,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
if (!events.isEmpty()) {
long startMs = time.milliseconds();
for (BackgroundEvent event : events) {
-
kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() -
event.enqueuedMs());
+
asyncConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() -
event.enqueuedMs());
try {
if (event instanceof CompletableEvent)
backgroundEventReaper.add((CompletableEvent<?>) event);
@@ -2137,7 +2143,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
log.warn("An error occurred when processing the
background event: {}", e.getMessage(), e);
}
}
-
kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds()
- startMs);
+
asyncConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds()
- startMs);
}
backgroundEventReaper.reap(time.milliseconds());
@@ -2270,10 +2276,14 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
@Override
- public AsyncConsumerMetrics kafkaConsumerMetrics() {
+ public KafkaConsumerMetrics kafkaConsumerMetrics() {
return kafkaConsumerMetrics;
}
+ AsyncConsumerMetrics asyncConsumerMetrics() {
+ return asyncConsumerMetrics;
+ }
+
// Visible for testing
SubscriptionState subscriptions() {
return subscriptions;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 0e4119b9e33..787d710535e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -256,7 +256,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
retryBackoffMs,
retryBackoffMaxMs);
- this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics,
CONSUMER_METRIC_GROUP_PREFIX);
+ this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics);
config.logUnused();
AppInfoParser.registerAppInfo(CONSUMER_JMX_PREFIX, clientId,
metrics, time.milliseconds());
@@ -296,7 +296,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.isolationLevel = ConsumerUtils.configuredIsolationLevel(config);
this.defaultApiTimeoutMs =
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.assignors = assignors;
- this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics,
CONSUMER_METRIC_GROUP_PREFIX);
+ this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics);
this.interceptors = new
ConsumerInterceptors<>(Collections.emptyList(), metrics);
this.retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.retryBackoffMaxMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
@@ -361,7 +361,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
int maxPollRecords =
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
boolean checkCrcs =
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG);
- ConsumerMetrics metricsRegistry = new
ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
+ ConsumerMetrics metricsRegistry = new ConsumerMetrics();
FetchMetricsManager metricsManager = new FetchMetricsManager(metrics,
metricsRegistry.fetcherMetrics);
ApiVersions apiVersions = new ApiVersions();
FetchConfig fetchConfig = new FetchConfig(
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
index 19e9a7e8320..3aa0bbcfbcf 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
@@ -24,6 +24,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+
public class ConsumerMetrics {
public FetchMetricsRegistry fetcherMetrics;
@@ -32,8 +34,8 @@ public class ConsumerMetrics {
this.fetcherMetrics = new FetchMetricsRegistry(metricsTags,
metricGrpPrefix);
}
- public ConsumerMetrics(String metricGroupPrefix) {
- this(new HashSet<>(), metricGroupPrefix);
+ public ConsumerMetrics() {
+ this(new HashSet<>(), CONSUMER_METRIC_GROUP_PREFIX);
}
private List<MetricNameTemplate> getAllTemplates() {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
index e4b0fa924c0..c07a6747559 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
@@ -69,6 +69,7 @@ public final class ConsumerUtils {
public static final String COORDINATOR_METRICS_SUFFIX =
"-coordinator-metrics";
public static final String CONSUMER_METRICS_SUFFIX = "-metrics";
public static final String CONSUMER_METRIC_GROUP =
CONSUMER_METRIC_GROUP_PREFIX + CONSUMER_METRICS_SUFFIX;
+ public static final String CONSUMER_SHARE_METRIC_GROUP =
CONSUMER_SHARE_METRIC_GROUP_PREFIX + CONSUMER_METRICS_SUFFIX;
/**
* A fixed, large enough value will suffice for max.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 32663249e7a..a3f0d8ee808 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -100,7 +100,7 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createShareFetchMetricsManager;
@@ -247,7 +247,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.clientTelemetryReporter =
CommonClientConfigs.telemetryReporter(clientId, config);
this.clientTelemetryReporter.ifPresent(reporters::add);
this.metrics = createMetrics(config, time, reporters);
- this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
+ this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics,
CONSUMER_SHARE_METRIC_GROUP);
this.acknowledgementMode = initializeAcknowledgementMode(config,
log);
this.deserializers = new Deserializers<>(config, keyDeserializer,
valueDeserializer, metrics);
@@ -323,7 +323,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
new FetchConfig(config),
deserializers);
- this.kafkaShareConsumerMetrics = new
KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX);
+ this.kafkaShareConsumerMetrics = new
KafkaShareConsumerMetrics(metrics);
config.logUnused();
AppInfoParser.registerAppInfo(CONSUMER_JMX_PREFIX, clientId,
metrics, time.milliseconds());
@@ -366,7 +366,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.fetchBuffer = new ShareFetchBuffer(logContext);
this.completedAcknowledgements = new LinkedList<>();
- ShareConsumerMetrics metricsRegistry = new
ShareConsumerMetrics(CONSUMER_SHARE_METRIC_GROUP_PREFIX);
+ ShareConsumerMetrics metricsRegistry = new ShareConsumerMetrics();
ShareFetchMetricsManager shareFetchMetricsManager = new
ShareFetchMetricsManager(metrics, metricsRegistry.shareFetchMetrics);
this.fetchCollector = new ShareFetchCollector<>(
logContext,
@@ -374,8 +374,8 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
subscriptions,
new FetchConfig(config),
deserializers);
- this.kafkaShareConsumerMetrics = new
KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX);
- this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
+ this.kafkaShareConsumerMetrics = new
KafkaShareConsumerMetrics(metrics);
+ this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics,
CONSUMER_SHARE_METRIC_GROUP);
final BlockingQueue<ApplicationEvent> applicationEventQueue = new
LinkedBlockingQueue<>();
final BlockingQueue<BackgroundEvent> backgroundEventQueue = new
LinkedBlockingQueue<>();
@@ -464,10 +464,10 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.deserializers = new Deserializers<>(keyDeserializer,
valueDeserializer, metrics);
this.currentFetch = ShareFetch.empty();
this.applicationEventHandler = applicationEventHandler;
- this.kafkaShareConsumerMetrics = new
KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX);
+ this.kafkaShareConsumerMetrics = new
KafkaShareConsumerMetrics(metrics);
this.clientTelemetryReporter = Optional.empty();
this.completedAcknowledgements = Collections.emptyList();
- this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
+ this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics,
CONSUMER_SHARE_METRIC_GROUP);
}
// auxiliary interface for testing
@@ -1116,19 +1116,23 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
LinkedList<BackgroundEvent> events = new LinkedList<>();
backgroundEventQueue.drainTo(events);
+ if (!events.isEmpty()) {
+ long startMs = time.milliseconds();
+ for (BackgroundEvent event : events) {
+
asyncConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() -
event.enqueuedMs());
+ try {
+ if (event instanceof CompletableEvent)
+ backgroundEventReaper.add((CompletableEvent<?>) event);
- for (BackgroundEvent event : events) {
- try {
- if (event instanceof CompletableEvent)
- backgroundEventReaper.add((CompletableEvent<?>) event);
-
- backgroundEventProcessor.process(event);
- } catch (Throwable t) {
- KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
+ backgroundEventProcessor.process(event);
+ } catch (Throwable t) {
+ KafkaException e =
ConsumerUtils.maybeWrapAsKafkaException(t);
- if (!firstError.compareAndSet(null, e))
- log.warn("An error occurred when processing the background
event: {}", e.getMessage(), e);
+ if (!firstError.compareAndSet(null, e))
+ log.warn("An error occurred when processing the
background event: {}", e.getMessage(), e);
+ }
}
+
asyncConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds()
- startMs);
}
backgroundEventReaper.reap(time.milliseconds());
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetrics.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetrics.java
index 41a41818dee..ee02dfcc17b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetrics.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerMetrics.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients.consumer.internals;
import java.util.HashSet;
import java.util.Set;
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
+
public class ShareConsumerMetrics {
public ShareFetchMetricsRegistry shareFetchMetrics;
@@ -26,7 +28,7 @@ public class ShareConsumerMetrics {
this.shareFetchMetrics = new ShareFetchMetricsRegistry(metricsTags,
metricGrpPrefix);
}
- public ShareConsumerMetrics(String metricGroupPrefix) {
- this(new HashSet<>(), metricGroupPrefix);
+ public ShareConsumerMetrics() {
+ this(new HashSet<>(), CONSUMER_SHARE_METRIC_GROUP_PREFIX);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
index 09e84cbe985..2f90440a662 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
@@ -24,10 +24,7 @@ import org.apache.kafka.common.metrics.stats.Value;
import java.util.Arrays;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
-
-public class AsyncConsumerMetrics extends KafkaConsumerMetrics implements
AutoCloseable {
+public class AsyncConsumerMetrics implements AutoCloseable {
private final Metrics metrics;
public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME =
"time-between-network-thread-poll";
@@ -51,15 +48,13 @@ public class AsyncConsumerMetrics extends
KafkaConsumerMetrics implements AutoCl
private final Sensor unsentRequestsQueueSizeSensor;
private final Sensor unsentRequestsQueueTimeSensor;
- public AsyncConsumerMetrics(Metrics metrics) {
- super(metrics, CONSUMER_METRIC_GROUP_PREFIX);
-
+ public AsyncConsumerMetrics(Metrics metrics, String groupName) {
this.metrics = metrics;
this.timeBetweenNetworkThreadPollSensor =
metrics.sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME);
this.timeBetweenNetworkThreadPollSensor.add(
metrics.metricName(
"time-between-network-thread-poll-avg",
- CONSUMER_METRIC_GROUP,
+ groupName,
"The average time taken, in milliseconds, between each poll in
the network thread."
),
new Avg()
@@ -67,7 +62,7 @@ public class AsyncConsumerMetrics extends
KafkaConsumerMetrics implements AutoCl
this.timeBetweenNetworkThreadPollSensor.add(
metrics.metricName(
"time-between-network-thread-poll-max",
- CONSUMER_METRIC_GROUP,
+ groupName,
"The maximum time taken, in milliseconds, between each poll in
the network thread."
),
new Max()
@@ -77,7 +72,7 @@ public class AsyncConsumerMetrics extends
KafkaConsumerMetrics implements AutoCl
this.applicationEventQueueSizeSensor.add(
metrics.metricName(
APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME,
- CONSUMER_METRIC_GROUP,
+ groupName,
"The current number of events in the queue to send from the
application thread to the background thread."
),
new Value()
@@ -87,7 +82,7 @@ public class AsyncConsumerMetrics extends
KafkaConsumerMetrics implements AutoCl
this.applicationEventQueueTimeSensor.add(
metrics.metricName(
"application-event-queue-time-avg",
- CONSUMER_METRIC_GROUP,
+ groupName,
"The average time, in milliseconds, that application events
are taking to be dequeued."
),
new Avg()
@@ -95,7 +90,7 @@ public class AsyncConsumerMetrics extends
KafkaConsumerMetrics implements AutoCl
this.applicationEventQueueTimeSensor.add(
metrics.metricName(
"application-event-queue-time-max",
- CONSUMER_METRIC_GROUP,
+ groupName,
"The maximum time, in milliseconds, that an application event
took to be dequeued."
),
new Max()
@@ -105,14 +100,14 @@ public class AsyncConsumerMetrics extends
KafkaConsumerMetrics implements AutoCl
this.applicationEventQueueProcessingTimeSensor.add(
metrics.metricName(
"application-event-queue-processing-time-avg",
- CONSUMER_METRIC_GROUP,
+ groupName,
"The average time, in milliseconds, that the background thread
takes to process all available application events."
),
new Avg()
);
this.applicationEventQueueProcessingTimeSensor.add(
metrics.metricName("application-event-queue-processing-time-max",
- CONSUMER_METRIC_GROUP,
+ groupName,
"The maximum time, in milliseconds, that the background thread
took to process all available application events."
),
new Max()
@@ -122,7 +117,7 @@ public class AsyncConsumerMetrics extends
KafkaConsumerMetrics implements AutoCl
this.applicationEventExpiredSizeSensor.add(
metrics.metricName(
APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME,
- CONSUMER_METRIC_GROUP,
+ groupName,
"The current number of expired application events."
),
new Value()
@@ -132,7 +127,7 @@ public class AsyncConsumerMetrics extends
KafkaConsumerMetrics implements AutoCl
this.unsentRequestsQueueSizeSensor.add(
metrics.metricName(
UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME,
- CONSUMER_METRIC_GROUP,
+ groupName,
"The current number of unsent requests in the background
thread."
),
new Value()
@@ -142,7 +137,7 @@ public class AsyncConsumerMetrics extends
KafkaConsumerMetrics implements AutoCl
this.unsentRequestsQueueTimeSensor.add(
metrics.metricName(
"unsent-requests-queue-time-avg",
- CONSUMER_METRIC_GROUP,
+ groupName,
"The average time, in milliseconds, that requests are taking
to be sent in the background thread."
),
new Avg()
@@ -150,7 +145,7 @@ public class AsyncConsumerMetrics extends
KafkaConsumerMetrics implements AutoCl
this.unsentRequestsQueueTimeSensor.add(
metrics.metricName(
"unsent-requests-queue-time-max",
- CONSUMER_METRIC_GROUP,
+ groupName,
"The maximum time, in milliseconds, that a request remained
unsent in the background thread."
),
new Max()
@@ -160,7 +155,7 @@ public class AsyncConsumerMetrics extends
KafkaConsumerMetrics implements AutoCl
this.backgroundEventQueueSizeSensor.add(
metrics.metricName(
BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME,
- CONSUMER_METRIC_GROUP,
+ groupName,
"The current number of events in the queue to send from the
background thread to the application thread."
),
new Value()
@@ -170,7 +165,7 @@ public class AsyncConsumerMetrics extends
KafkaConsumerMetrics implements AutoCl
this.backgroundEventQueueTimeSensor.add(
metrics.metricName(
"background-event-queue-time-avg",
- CONSUMER_METRIC_GROUP,
+ groupName,
"The average time, in milliseconds, that background events are
taking to be dequeued."
),
new Avg()
@@ -178,7 +173,7 @@ public class AsyncConsumerMetrics extends
KafkaConsumerMetrics implements AutoCl
this.backgroundEventQueueTimeSensor.add(
metrics.metricName(
"background-event-queue-time-max",
- CONSUMER_METRIC_GROUP,
+ groupName,
"The maximum time, in milliseconds, that background events are
taking to be dequeued."
),
new Max()
@@ -188,7 +183,7 @@ public class AsyncConsumerMetrics extends
KafkaConsumerMetrics implements AutoCl
this.backgroundEventQueueProcessingTimeSensor.add(
metrics.metricName(
"background-event-queue-processing-time-avg",
- CONSUMER_METRIC_GROUP,
+ groupName,
"The average time, in milliseconds, that the consumer took to
process all available background events."
),
new Avg()
@@ -196,7 +191,7 @@ public class AsyncConsumerMetrics extends
KafkaConsumerMetrics implements AutoCl
this.backgroundEventQueueProcessingTimeSensor.add(
metrics.metricName(
"background-event-queue-processing-time-max",
- CONSUMER_METRIC_GROUP,
+ groupName,
"The maximum time, in milliseconds, that the consumer took to
process all available background events."
),
new Max()
@@ -257,6 +252,5 @@ public class AsyncConsumerMetrics extends
KafkaConsumerMetrics implements AutoCl
unsentRequestsQueueSizeSensor.name(),
unsentRequestsQueueTimeSensor.name()
).forEach(metrics::removeSensor);
- super.close();
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java
index 52502e714a9..1b2bb4518f9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaConsumerMetrics.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.metrics.stats.Max;
import java.util.concurrent.TimeUnit;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX;
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
public class KafkaConsumerMetrics implements AutoCloseable {
private final Metrics metrics;
@@ -39,9 +39,9 @@ public class KafkaConsumerMetrics implements AutoCloseable {
private long pollStartMs;
private long timeSinceLastPollMs;
- public KafkaConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
+ public KafkaConsumerMetrics(Metrics metrics) {
this.metrics = metrics;
- final String metricGroupName = metricGrpPrefix +
CONSUMER_METRICS_SUFFIX;
+ final String metricGroupName = CONSUMER_METRIC_GROUP;
Measurable lastPoll = (mConfig, now) -> {
if (lastPollMs == 0L)
// if no poll is ever triggered, just return -1.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java
index b7da8245aaa..e154b97da5a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/KafkaShareConsumerMetrics.java
@@ -25,7 +25,7 @@ import org.apache.kafka.common.metrics.stats.Max;
import java.util.concurrent.TimeUnit;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRICS_SUFFIX;
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP;
public class KafkaShareConsumerMetrics implements AutoCloseable {
private final Metrics metrics;
@@ -36,9 +36,9 @@ public class KafkaShareConsumerMetrics implements
AutoCloseable {
private long pollStartMs;
private long timeSinceLastPollMs;
- public KafkaShareConsumerMetrics(Metrics metrics, String metricGrpPrefix) {
+ public KafkaShareConsumerMetrics(Metrics metrics) {
this.metrics = metrics;
- final String metricGroupName = metricGrpPrefix +
CONSUMER_METRICS_SUFFIX;
+ final String metricGroupName = CONSUMER_SHARE_METRIC_GROUP;
Measurable lastPoll = (mConfig, now) -> {
if (lastPollMs == 0L)
// if no poll is ever triggered, just return -1.
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
index 3430719b16e..402697227ee 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
@@ -27,7 +27,8 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -44,10 +45,11 @@ public class ApplicationEventHandlerTest {
private final RequestManagers requestManagers =
mock(RequestManagers.class);
private final CompletableEventReaper applicationEventReaper =
mock(CompletableEventReaper.class);
- @Test
- public void testRecordApplicationEventQueueSize() {
+ @ParameterizedTest
+
@MethodSource("org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetricsTest#groupNameProvider")
+ public void testRecordApplicationEventQueueSize(String groupName) {
try (Metrics metrics = new Metrics();
- AsyncConsumerMetrics asyncConsumerMetrics = spy(new
AsyncConsumerMetrics(metrics));
+ AsyncConsumerMetrics asyncConsumerMetrics = spy(new
AsyncConsumerMetrics(metrics, groupName));
ApplicationEventHandler applicationEventHandler = new
ApplicationEventHandler(
new LogContext(),
time,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index c03d1553ac5..6a5cb871fd1 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -2015,12 +2015,12 @@ public class AsyncKafkaConsumerTest {
mock(ConsumerRebalanceListenerInvoker.class),
mock(SubscriptionState.class));
Metrics metrics = consumer.metricsRegistry();
- AsyncConsumerMetrics kafkaConsumerMetrics =
consumer.kafkaConsumerMetrics();
+ AsyncConsumerMetrics asyncConsumerMetrics =
consumer.asyncConsumerMetrics();
ConsumerRebalanceListenerCallbackNeededEvent event = new
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED,
Collections.emptySortedSet());
event.setEnqueuedMs(time.milliseconds());
backgroundEventQueue.add(event);
- kafkaConsumerMetrics.recordBackgroundEventQueueSize(1);
+ asyncConsumerMetrics.recordBackgroundEventQueueSize(1);
time.sleep(10);
consumer.processBackgroundEvents();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java
index 63269b6f554..7a999e51163 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java
@@ -23,22 +23,23 @@ import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
import static
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics.BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class BackgroundEventHandlerTest {
private final BlockingQueue<BackgroundEvent> backgroundEventsQueue = new
LinkedBlockingQueue<>();
- @Test
- public void testRecordBackgroundEventQueueSize() {
+ @ParameterizedTest
+
@MethodSource("org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetricsTest#groupNameProvider")
+ public void testRecordBackgroundEventQueueSize(String groupName) {
try (Metrics metrics = new Metrics();
- AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics)) {
+ AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics, groupName)) {
BackgroundEventHandler backgroundEventHandler = new
BackgroundEventHandler(
backgroundEventsQueue,
new MockTime(0),
@@ -48,7 +49,7 @@ public class BackgroundEventHandlerTest {
assertEquals(
1,
(double) metrics.metric(
-
metrics.metricName(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME,
CONSUMER_METRIC_GROUP)
+
metrics.metricName(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME, groupName)
).metricValue()
);
@@ -57,7 +58,7 @@ public class BackgroundEventHandlerTest {
assertEquals(
0,
(double) metrics.metric(
-
metrics.metricName(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME,
CONSUMER_METRIC_GROUP)
+
metrics.metricName(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME, groupName)
).metricValue()
);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
index 59b0a346a23..35ccb17dfab 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
@@ -42,7 +43,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -206,10 +206,11 @@ public class ConsumerNetworkThreadTest {
verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong());
}
- @Test
- public void testRunOnceRecordTimeBetweenNetworkThreadPoll() {
+ @ParameterizedTest
+
@MethodSource("org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetricsTest#groupNameProvider")
+ public void testRunOnceRecordTimeBetweenNetworkThreadPoll(String
groupName) {
try (Metrics metrics = new Metrics();
- AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics);
+ AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics, groupName);
ConsumerNetworkThread consumerNetworkThread = new
ConsumerNetworkThread(
new LogContext(),
time,
@@ -228,22 +229,23 @@ public class ConsumerNetworkThreadTest {
assertEquals(
10,
(double) metrics.metric(
- metrics.metricName("time-between-network-thread-poll-avg",
CONSUMER_METRIC_GROUP)
+ metrics.metricName("time-between-network-thread-poll-avg",
groupName)
).metricValue()
);
assertEquals(
10,
(double) metrics.metric(
- metrics.metricName("time-between-network-thread-poll-max",
CONSUMER_METRIC_GROUP)
+ metrics.metricName("time-between-network-thread-poll-max",
groupName)
).metricValue()
);
}
}
- @Test
- public void
testRunOnceRecordApplicationEventQueueSizeAndApplicationEventQueueTime() {
+ @ParameterizedTest
+
@MethodSource("org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetricsTest#groupNameProvider")
+ public void
testRunOnceRecordApplicationEventQueueSizeAndApplicationEventQueueTime(String
groupName) {
try (Metrics metrics = new Metrics();
- AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics);
+ AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics, groupName);
ConsumerNetworkThread consumerNetworkThread = new
ConsumerNetworkThread(
new LogContext(),
time,
@@ -266,19 +268,19 @@ public class ConsumerNetworkThreadTest {
assertEquals(
0,
(double) metrics.metric(
- metrics.metricName("application-event-queue-size",
CONSUMER_METRIC_GROUP)
+ metrics.metricName("application-event-queue-size",
groupName)
).metricValue()
);
assertEquals(
10,
(double) metrics.metric(
- metrics.metricName("application-event-queue-time-avg",
CONSUMER_METRIC_GROUP)
+ metrics.metricName("application-event-queue-time-avg",
groupName)
).metricValue()
);
assertEquals(
10,
(double) metrics.metric(
- metrics.metricName("application-event-queue-time-max",
CONSUMER_METRIC_GROUP)
+ metrics.metricName("application-event-queue-time-max",
groupName)
).metricValue()
);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
index c75ee906e53..22aa33098ee 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/KafkaConsumerMetricsTest.java
@@ -18,23 +18,27 @@
package org.apache.kafka.clients.consumer.internals;
import
org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.junit.jupiter.api.Test;
+import java.util.Set;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
class KafkaConsumerMetricsTest {
private static final long METRIC_VALUE = 123L;
- private static final String CONSUMER_GROUP_PREFIX = "consumer";
private static final String CONSUMER_METRIC_GROUP = "consumer-metrics";
private static final String COMMIT_SYNC_TIME_TOTAL =
"commit-sync-time-ns-total";
private static final String COMMITTED_TIME_TOTAL =
"committed-time-ns-total";
private final Metrics metrics = new Metrics();
private final KafkaConsumerMetrics consumerMetrics
- = new KafkaConsumerMetrics(metrics, CONSUMER_GROUP_PREFIX);
+ = new KafkaConsumerMetrics(metrics);
@Test
public void shouldRecordCommitSyncTime() {
@@ -64,6 +68,31 @@ class KafkaConsumerMetricsTest {
assertMetricRemoved(COMMITTED_TIME_TOTAL);
}
+ @Test
+ public void checkMetricsAfterCreation() {
+ Set<MetricName> expectedMetrics = Set.of(
+ metrics.metricName("last-poll-seconds-ago", CONSUMER_METRIC_GROUP),
+ metrics.metricName("time-between-poll-avg", CONSUMER_METRIC_GROUP),
+ metrics.metricName("time-between-poll-max", CONSUMER_METRIC_GROUP),
+ metrics.metricName("poll-idle-ratio-avg", CONSUMER_METRIC_GROUP),
+ metrics.metricName("commit-sync-time-ns-total",
CONSUMER_METRIC_GROUP),
+ metrics.metricName("committed-time-ns-total",
CONSUMER_METRIC_GROUP)
+ );
+ expectedMetrics.forEach(
+ metricName -> assertTrue(
+ metrics.metrics().containsKey(metricName),
+ "Missing metric: " + metricName
+ )
+ );
+ consumerMetrics.close();
+ expectedMetrics.forEach(
+ metricName -> assertFalse(
+ metrics.metrics().containsKey(metricName),
+ "Metric present after close: " + metricName
+ )
+ );
+ }
+
private void assertMetricRemoved(final String name) {
assertNull(metrics.metric(metrics.metricName(name,
CONSUMER_METRIC_GROUP)));
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
index 4ff967e1f02..24f4aea1c7a 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
@@ -40,6 +40,8 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.ArrayList;
import java.util.Collections;
@@ -53,7 +55,6 @@ import static
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -246,10 +247,11 @@ public class NetworkClientDelegateTest {
assertEquals(authException, ((ErrorEvent) event).error());
}
- @Test
- public void testRecordUnsentRequestsQueueTime() throws Exception {
+ @ParameterizedTest
+
@MethodSource("org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetricsTest#groupNameProvider")
+ public void testRecordUnsentRequestsQueueTime(String groupName) throws
Exception {
try (Metrics metrics = new Metrics();
- AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics);
+ AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics, groupName);
NetworkClientDelegate networkClientDelegate =
newNetworkClientDelegate(false, asyncConsumerMetrics)) {
NetworkClientDelegate.UnsentRequest unsentRequest =
newUnsentFindCoordinatorRequest();
networkClientDelegate.add(unsentRequest);
@@ -261,19 +263,19 @@ public class NetworkClientDelegateTest {
assertEquals(
0,
(double) metrics.metric(
- metrics.metricName("unsent-requests-queue-size",
CONSUMER_METRIC_GROUP)
+ metrics.metricName("unsent-requests-queue-size", groupName)
).metricValue()
);
assertEquals(
10,
(double) metrics.metric(
- metrics.metricName("unsent-requests-queue-time-avg",
CONSUMER_METRIC_GROUP)
+ metrics.metricName("unsent-requests-queue-time-avg",
groupName)
).metricValue()
);
assertEquals(
10,
(double) metrics.metric(
- metrics.metricName("unsent-requests-queue-time-max",
CONSUMER_METRIC_GROUP)
+ metrics.metricName("unsent-requests-queue-time-max",
groupName)
).metricValue()
);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
index 27315068e10..d6dd8c30caa 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
@@ -20,11 +20,14 @@ import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Set;
+import java.util.stream.Stream;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -35,6 +38,13 @@ public class AsyncConsumerMetricsTest {
private final Metrics metrics = new Metrics();
private AsyncConsumerMetrics consumerMetrics;
+ public static Stream<String> groupNameProvider() {
+ return Stream.of(
+ CONSUMER_METRIC_GROUP,
+ CONSUMER_SHARE_METRIC_GROUP
+ );
+ }
+
@AfterEach
public void tearDown() {
if (consumerMetrics != null) {
@@ -43,17 +53,27 @@ public class AsyncConsumerMetricsTest {
metrics.close();
}
- @Test
- public void shouldMetricNames() {
+ @ParameterizedTest
+ @MethodSource("groupNameProvider")
+ public void shouldMetricNames(String groupName) {
// create
- consumerMetrics = new AsyncConsumerMetrics(metrics);
+ consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
Set<MetricName> expectedMetrics = Set.of(
- metrics.metricName("last-poll-seconds-ago", CONSUMER_METRIC_GROUP),
- metrics.metricName("time-between-poll-avg", CONSUMER_METRIC_GROUP),
- metrics.metricName("time-between-poll-max", CONSUMER_METRIC_GROUP),
- metrics.metricName("poll-idle-ratio-avg", CONSUMER_METRIC_GROUP),
- metrics.metricName("commit-sync-time-ns-total",
CONSUMER_METRIC_GROUP),
- metrics.metricName("committed-time-ns-total",
CONSUMER_METRIC_GROUP)
+ metrics.metricName("time-between-network-thread-poll-avg",
groupName),
+ metrics.metricName("time-between-network-thread-poll-max",
groupName),
+ metrics.metricName("application-event-queue-size", groupName),
+ metrics.metricName("application-event-queue-time-avg", groupName),
+ metrics.metricName("application-event-queue-time-max", groupName),
+ metrics.metricName("application-event-queue-processing-time-avg",
groupName),
+ metrics.metricName("application-event-queue-processing-time-max",
groupName),
+ metrics.metricName("unsent-requests-queue-size", groupName),
+ metrics.metricName("unsent-requests-queue-time-avg", groupName),
+ metrics.metricName("unsent-requests-queue-time-max", groupName),
+ metrics.metricName("background-event-queue-size", groupName),
+ metrics.metricName("background-event-queue-time-avg", groupName),
+ metrics.metricName("background-event-queue-time-max", groupName),
+ metrics.metricName("background-event-queue-processing-time-avg",
groupName),
+ metrics.metricName("background-event-queue-processing-time-max",
groupName)
);
expectedMetrics.forEach(
metricName -> assertTrue(
@@ -62,30 +82,6 @@ public class AsyncConsumerMetricsTest {
)
);
- Set<MetricName> expectedConsumerMetrics = Set.of(
- metrics.metricName("time-between-network-thread-poll-avg",
CONSUMER_METRIC_GROUP),
- metrics.metricName("time-between-network-thread-poll-max",
CONSUMER_METRIC_GROUP),
- metrics.metricName("application-event-queue-size",
CONSUMER_METRIC_GROUP),
- metrics.metricName("application-event-queue-time-avg",
CONSUMER_METRIC_GROUP),
- metrics.metricName("application-event-queue-time-max",
CONSUMER_METRIC_GROUP),
- metrics.metricName("application-event-queue-processing-time-avg",
CONSUMER_METRIC_GROUP),
- metrics.metricName("application-event-queue-processing-time-max",
CONSUMER_METRIC_GROUP),
- metrics.metricName("unsent-requests-queue-size",
CONSUMER_METRIC_GROUP),
- metrics.metricName("unsent-requests-queue-time-avg",
CONSUMER_METRIC_GROUP),
- metrics.metricName("unsent-requests-queue-time-max",
CONSUMER_METRIC_GROUP),
- metrics.metricName("background-event-queue-size",
CONSUMER_METRIC_GROUP),
- metrics.metricName("background-event-queue-time-avg",
CONSUMER_METRIC_GROUP),
- metrics.metricName("background-event-queue-time-max",
CONSUMER_METRIC_GROUP),
- metrics.metricName("background-event-queue-processing-time-avg",
CONSUMER_METRIC_GROUP),
- metrics.metricName("background-event-queue-processing-time-max",
CONSUMER_METRIC_GROUP)
- );
- expectedConsumerMetrics.forEach(
- metricName -> assertTrue(
- metrics.metrics().containsKey(metricName),
- "Missing metric: " + metricName
- )
- );
-
// close
consumerMetrics.close();
expectedMetrics.forEach(
@@ -94,28 +90,24 @@ public class AsyncConsumerMetricsTest {
"Metric present after close: " + metricName
)
);
- expectedConsumerMetrics.forEach(
- metricName -> assertFalse(
- metrics.metrics().containsKey(metricName),
- "Metric present after close: " + metricName
- )
- );
}
- @Test
- public void shouldRecordTimeBetweenNetworkThreadPoll() {
- consumerMetrics = new AsyncConsumerMetrics(metrics);
+ @ParameterizedTest
+ @MethodSource("groupNameProvider")
+ public void shouldRecordTimeBetweenNetworkThreadPoll(String groupName) {
+ consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
// When:
consumerMetrics.recordTimeBetweenNetworkThreadPoll(METRIC_VALUE);
// Then:
- assertMetricValue("time-between-network-thread-poll-avg");
- assertMetricValue("time-between-network-thread-poll-max");
+ assertMetricValue("time-between-network-thread-poll-avg", groupName);
+ assertMetricValue("time-between-network-thread-poll-max", groupName);
}
- @Test
- public void shouldRecordApplicationEventQueueSize() {
- consumerMetrics = new AsyncConsumerMetrics(metrics);
+ @ParameterizedTest
+ @MethodSource("groupNameProvider")
+ public void shouldRecordApplicationEventQueueSize(String groupName) {
+ consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
// When:
consumerMetrics.recordApplicationEventQueueSize(10);
@@ -124,38 +116,41 @@ public class AsyncConsumerMetricsTest {
metrics.metric(
metrics.metricName(
"application-event-queue-size",
- CONSUMER_METRIC_GROUP
+ groupName
)
).metricValue(),
(double) 10
);
}
- @Test
- public void shouldRecordApplicationEventQueueTime() {
- consumerMetrics = new AsyncConsumerMetrics(metrics);
+ @ParameterizedTest
+ @MethodSource("groupNameProvider")
+ public void shouldRecordApplicationEventQueueTime(String groupName) {
+ consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
// When:
consumerMetrics.recordApplicationEventQueueTime(METRIC_VALUE);
// Then:
- assertMetricValue("application-event-queue-time-avg");
- assertMetricValue("application-event-queue-time-max");
+ assertMetricValue("application-event-queue-time-avg", groupName);
+ assertMetricValue("application-event-queue-time-max", groupName);
}
- @Test
- public void shouldRecordApplicationEventQueueProcessingTime() {
- consumerMetrics = new AsyncConsumerMetrics(metrics);
+ @ParameterizedTest
+ @MethodSource("groupNameProvider")
+ public void shouldRecordApplicationEventQueueProcessingTime(String
groupName) {
+ consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
// When:
consumerMetrics.recordApplicationEventQueueProcessingTime(METRIC_VALUE);
// Then:
- assertMetricValue("application-event-queue-processing-time-avg");
- assertMetricValue("application-event-queue-processing-time-max");
+ assertMetricValue("application-event-queue-processing-time-avg",
groupName);
+ assertMetricValue("application-event-queue-processing-time-max",
groupName);
}
- @Test
- public void shouldRecordUnsentRequestsQueueSize() {
- consumerMetrics = new AsyncConsumerMetrics(metrics);
+ @ParameterizedTest
+ @MethodSource("groupNameProvider")
+ public void shouldRecordUnsentRequestsQueueSize(String groupName) {
+ consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
// When:
consumerMetrics.recordUnsentRequestsQueueSize(10, 100);
@@ -164,27 +159,29 @@ public class AsyncConsumerMetricsTest {
metrics.metric(
metrics.metricName(
"unsent-requests-queue-size",
- CONSUMER_METRIC_GROUP
+ groupName
)
).metricValue(),
(double) 10
);
}
- @Test
- public void shouldRecordUnsentRequestsQueueTime() {
- consumerMetrics = new AsyncConsumerMetrics(metrics);
+ @ParameterizedTest
+ @MethodSource("groupNameProvider")
+ public void shouldRecordUnsentRequestsQueueTime(String groupName) {
+ consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
// When:
consumerMetrics.recordUnsentRequestsQueueTime(METRIC_VALUE);
// Then:
- assertMetricValue("unsent-requests-queue-time-avg");
- assertMetricValue("unsent-requests-queue-time-max");
+ assertMetricValue("unsent-requests-queue-time-avg", groupName);
+ assertMetricValue("unsent-requests-queue-time-max", groupName);
}
- @Test
- public void shouldRecordBackgroundEventQueueSize() {
- consumerMetrics = new AsyncConsumerMetrics(metrics);
+ @ParameterizedTest
+ @MethodSource("groupNameProvider")
+ public void shouldRecordBackgroundEventQueueSize(String groupName) {
+ consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
// When:
consumerMetrics.recordBackgroundEventQueueSize(10);
@@ -193,41 +190,43 @@ public class AsyncConsumerMetricsTest {
metrics.metric(
metrics.metricName(
"background-event-queue-size",
- CONSUMER_METRIC_GROUP
+ groupName
)
).metricValue(),
(double) 10
);
}
- @Test
- public void shouldRecordBackgroundEventQueueTime() {
- consumerMetrics = new AsyncConsumerMetrics(metrics);
+ @ParameterizedTest
+ @MethodSource("groupNameProvider")
+ public void shouldRecordBackgroundEventQueueTime(String groupName) {
+ consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
// When:
consumerMetrics.recordBackgroundEventQueueTime(METRIC_VALUE);
// Then:
- assertMetricValue("background-event-queue-time-avg");
- assertMetricValue("background-event-queue-time-max");
+ assertMetricValue("background-event-queue-time-avg", groupName);
+ assertMetricValue("background-event-queue-time-max", groupName);
}
- @Test
- public void shouldRecordBackgroundEventQueueProcessingTime() {
- consumerMetrics = new AsyncConsumerMetrics(metrics);
+ @ParameterizedTest
+ @MethodSource("groupNameProvider")
+ public void shouldRecordBackgroundEventQueueProcessingTime(String
groupName) {
+ consumerMetrics = new AsyncConsumerMetrics(metrics, groupName);
// When:
consumerMetrics.recordBackgroundEventQueueProcessingTime(METRIC_VALUE);
// Then:
- assertMetricValue("background-event-queue-processing-time-avg");
- assertMetricValue("background-event-queue-processing-time-avg");
+ assertMetricValue("background-event-queue-processing-time-avg",
groupName);
+ assertMetricValue("background-event-queue-processing-time-max",
groupName);
}
- private void assertMetricValue(final String name) {
+ private void assertMetricValue(final String name, final String groupName) {
assertEquals(
metrics.metric(
metrics.metricName(
name,
- CONSUMER_METRIC_GROUP
+ groupName
)
).metricValue(),
(double) METRIC_VALUE