This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 29b940bef43 MINOR: Use drainEvents() in
ShareConsumerImpl::processBackgroundEvents (#20474)
29b940bef43 is described below
commit 29b940bef439a1819d9de254eb68f9f7e6684bc3
Author: Shivsundar R <[email protected]>
AuthorDate: Thu Sep 4 09:39:50 2025 -0400
MINOR: Use drainEvents() in ShareConsumerImpl::processBackgroundEvents
(#20474)
*What*
- Currently in `ShareConsumerImpl`, we were not resetting
`background-event-queue-size` metric to 0 after draining the events from
the queue.
- This PR fixes it by using `BackgroundEventHandler::drainEvents`
similar to `AsyncKafkaConsumer`.
- Added a unit test to verify the metric is reset to 0 after draining
the events.
Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../consumer/internals/ShareConsumerImpl.java | 21 ++++++++++++++-------
.../consumer/internals/ShareConsumerImplTest.java | 19 +++++++++++++++++++
2 files changed, 33 insertions(+), 7 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index a3f0d8ee808..12b01b5482e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -170,6 +170,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
private final String clientId;
private final String groupId;
private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
+ private final BackgroundEventHandler backgroundEventHandler;
private final BackgroundEventProcessor backgroundEventProcessor;
private final CompletableEventReaper backgroundEventReaper;
private final Deserializers<K, V> deserializers;
@@ -263,7 +264,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
ShareFetchMetricsManager shareFetchMetricsManager =
createShareFetchMetricsManager(metrics);
ApiVersions apiVersions = new ApiVersions();
final BlockingQueue<ApplicationEvent> applicationEventQueue = new
LinkedBlockingQueue<>();
- final BackgroundEventHandler backgroundEventHandler = new
BackgroundEventHandler(
+ this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue, time, asyncConsumerMetrics);
// This FetchBuffer is shared between the application and network
threads.
@@ -378,8 +379,8 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics,
CONSUMER_SHARE_METRIC_GROUP);
final BlockingQueue<ApplicationEvent> applicationEventQueue = new
LinkedBlockingQueue<>();
- final BlockingQueue<BackgroundEvent> backgroundEventQueue = new
LinkedBlockingQueue<>();
- final BackgroundEventHandler backgroundEventHandler = new
BackgroundEventHandler(
+ this.backgroundEventQueue = new LinkedBlockingQueue<>();
+ this.backgroundEventHandler = new BackgroundEventHandler(
backgroundEventQueue, time, asyncConsumerMetrics);
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier =
@@ -419,7 +420,6 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
requestManagersSupplier,
asyncConsumerMetrics);
- this.backgroundEventQueue = new LinkedBlockingQueue<>();
this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper = new CompletableEventReaper(logContext);
@@ -468,6 +468,8 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.clientTelemetryReporter = Optional.empty();
this.completedAcknowledgements = Collections.emptyList();
this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics,
CONSUMER_SHARE_METRIC_GROUP);
+ this.backgroundEventHandler = new BackgroundEventHandler(
+ backgroundEventQueue, time, asyncConsumerMetrics);
}
// auxiliary interface for testing
@@ -1110,12 +1112,13 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
* It is possible that {@link ErrorEvent an error}
* could occur when processing the events. In such cases, the processor
will take a reference to the first
* error, continue to process the remaining events, and then throw the
first error that occurred.
+ *
+ * Visible for testing.
*/
- private boolean processBackgroundEvents() {
+ boolean processBackgroundEvents() {
AtomicReference<KafkaException> firstError = new AtomicReference<>();
- LinkedList<BackgroundEvent> events = new LinkedList<>();
- backgroundEventQueue.drainTo(events);
+ List<BackgroundEvent> events = backgroundEventHandler.drainEvents();
if (!events.isEmpty()) {
long startMs = time.milliseconds();
for (BackgroundEvent event : events) {
@@ -1234,6 +1237,10 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
return metrics;
}
+ AsyncConsumerMetrics asyncConsumerMetrics() {
+ return asyncConsumerMetrics;
+ }
+
@Override
public KafkaShareConsumerMetrics kafkaShareConsumerMetrics() {
return kafkaShareConsumerMetrics;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 09fc99d8e24..0fa3def7c15 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -26,11 +26,13 @@ import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
import
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
@@ -76,6 +78,7 @@ import java.util.function.Predicate;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -779,6 +782,22 @@ public class ShareConsumerImplTest {
assertEquals(1000, timer.remainingMs());
}
+ @Test
+ public void testRecordBackgroundEventQueueSize() {
+ consumer = newConsumer();
+ Metrics metrics = consumer.metricsRegistry();
+ AsyncConsumerMetrics asyncConsumerMetrics =
consumer.asyncConsumerMetrics();
+
+ ShareAcknowledgementCommitCallbackEvent event = new
ShareAcknowledgementCommitCallbackEvent(Map.of());
+ backgroundEventQueue.add(event);
+ asyncConsumerMetrics.recordBackgroundEventQueueSize(1);
+
+ assertEquals(1, (double)
metrics.metric(metrics.metricName("background-event-queue-size",
CONSUMER_SHARE_METRIC_GROUP)).metricValue());
+
+ consumer.processBackgroundEvents();
+ assertEquals(0, (double)
metrics.metric(metrics.metricName("background-event-queue-size",
CONSUMER_SHARE_METRIC_GROUP)).metricValue());
+ }
+
/**
* Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer,
Predicate) processBackgroundEvents}
* handles the case where the {@link Future} does not complete within the
timeout.