This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 29b940bef43 MINOR: Use drainEvents() in 
ShareConsumerImpl::processBackgroundEvents (#20474)
29b940bef43 is described below

commit 29b940bef439a1819d9de254eb68f9f7e6684bc3
Author: Shivsundar R <[email protected]>
AuthorDate: Thu Sep 4 09:39:50 2025 -0400

    MINOR: Use drainEvents() in ShareConsumerImpl::processBackgroundEvents 
(#20474)
    
    *What*
    
    - Currently in `ShareConsumerImpl`, we were not resetting
    `background-event-queue-size` metric to 0 after draining the events from
    the queue.
    - This PR fixes it by using `BackgroundEventHandler::drainEvents`
    similar to `AsyncKafkaConsumer`.
    - Added a unit test to verify the metric is reset to 0 after draining
    the events.
    
    Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai
    <[email protected]>
---
 .../consumer/internals/ShareConsumerImpl.java       | 21 ++++++++++++++-------
 .../consumer/internals/ShareConsumerImplTest.java   | 19 +++++++++++++++++++
 2 files changed, 33 insertions(+), 7 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index a3f0d8ee808..12b01b5482e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -170,6 +170,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
     private final String clientId;
     private final String groupId;
     private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
+    private final BackgroundEventHandler backgroundEventHandler;
     private final BackgroundEventProcessor backgroundEventProcessor;
     private final CompletableEventReaper backgroundEventReaper;
     private final Deserializers<K, V> deserializers;
@@ -263,7 +264,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             ShareFetchMetricsManager shareFetchMetricsManager = 
createShareFetchMetricsManager(metrics);
             ApiVersions apiVersions = new ApiVersions();
             final BlockingQueue<ApplicationEvent> applicationEventQueue = new 
LinkedBlockingQueue<>();
-            final BackgroundEventHandler backgroundEventHandler = new 
BackgroundEventHandler(
+            this.backgroundEventHandler = new BackgroundEventHandler(
                 backgroundEventQueue, time, asyncConsumerMetrics);
 
             // This FetchBuffer is shared between the application and network 
threads.
@@ -378,8 +379,8 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, 
CONSUMER_SHARE_METRIC_GROUP);
 
         final BlockingQueue<ApplicationEvent> applicationEventQueue = new 
LinkedBlockingQueue<>();
-        final BlockingQueue<BackgroundEvent> backgroundEventQueue = new 
LinkedBlockingQueue<>();
-        final BackgroundEventHandler backgroundEventHandler = new 
BackgroundEventHandler(
+        this.backgroundEventQueue = new LinkedBlockingQueue<>();
+        this.backgroundEventHandler = new BackgroundEventHandler(
             backgroundEventQueue, time, asyncConsumerMetrics);
 
         final Supplier<NetworkClientDelegate> networkClientDelegateSupplier =
@@ -419,7 +420,6 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                 requestManagersSupplier,
                 asyncConsumerMetrics);
 
-        this.backgroundEventQueue = new LinkedBlockingQueue<>();
         this.backgroundEventProcessor = new BackgroundEventProcessor();
         this.backgroundEventReaper = new CompletableEventReaper(logContext);
 
@@ -468,6 +468,8 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         this.clientTelemetryReporter = Optional.empty();
         this.completedAcknowledgements = Collections.emptyList();
         this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics, 
CONSUMER_SHARE_METRIC_GROUP);
+        this.backgroundEventHandler = new BackgroundEventHandler(
+                backgroundEventQueue, time, asyncConsumerMetrics);
     }
 
     // auxiliary interface for testing
@@ -1110,12 +1112,13 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
      * It is possible that {@link ErrorEvent an error}
      * could occur when processing the events. In such cases, the processor 
will take a reference to the first
      * error, continue to process the remaining events, and then throw the 
first error that occurred.
+     *
+     * Visible for testing.
      */
-    private boolean processBackgroundEvents() {
+    boolean processBackgroundEvents() {
         AtomicReference<KafkaException> firstError = new AtomicReference<>();
 
-        LinkedList<BackgroundEvent> events = new LinkedList<>();
-        backgroundEventQueue.drainTo(events);
+        List<BackgroundEvent> events = backgroundEventHandler.drainEvents();
         if (!events.isEmpty()) {
             long startMs = time.milliseconds();
             for (BackgroundEvent event : events) {
@@ -1234,6 +1237,10 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         return metrics;
     }
 
+    AsyncConsumerMetrics asyncConsumerMetrics() {
+        return asyncConsumerMetrics;
+    }
+
     @Override
     public KafkaShareConsumerMetrics kafkaShareConsumerMetrics() {
         return kafkaShareConsumerMetrics;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 09fc99d8e24..0fa3def7c15 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -26,11 +26,13 @@ import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
 import org.apache.kafka.clients.consumer.internals.events.PollEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgeOnCloseEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackRegistrationEvent;
 import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.StopFindCoordinatorOnCloseEvent;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
@@ -76,6 +78,7 @@ import java.util.function.Predicate;
 
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -779,6 +782,22 @@ public class ShareConsumerImplTest {
         assertEquals(1000, timer.remainingMs());
     }
 
+    @Test
+    public void testRecordBackgroundEventQueueSize() {
+        consumer = newConsumer();
+        Metrics metrics = consumer.metricsRegistry();
+        AsyncConsumerMetrics asyncConsumerMetrics = 
consumer.asyncConsumerMetrics();
+
+        ShareAcknowledgementCommitCallbackEvent event = new 
ShareAcknowledgementCommitCallbackEvent(Map.of());
+        backgroundEventQueue.add(event);
+        asyncConsumerMetrics.recordBackgroundEventQueueSize(1);
+
+        assertEquals(1, (double) 
metrics.metric(metrics.metricName("background-event-queue-size", 
CONSUMER_SHARE_METRIC_GROUP)).metricValue());
+
+        consumer.processBackgroundEvents();
+        assertEquals(0, (double) 
metrics.metric(metrics.metricName("background-event-queue-size", 
CONSUMER_SHARE_METRIC_GROUP)).metricValue());
+    }
+
     /**
      * Tests {@link ShareConsumerImpl#processBackgroundEvents(Future, Timer, 
Predicate) processBackgroundEvents}
      * handles the case where the {@link Future} does not complete within the 
timeout.

Reply via email to