This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0721d21a57e KAFKA-18415: Fix for event queue metric and flaky test 
(#18416)
0721d21a57e is described below

commit 0721d21a57ee74c8faab8cab5ec2c06007629689
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Jan 8 14:31:10 2025 +0100

    KAFKA-18415: Fix for event queue metric and flaky test (#18416)
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../internals/events/ApplicationEventHandler.java         |  4 +++-
 .../consumer/internals/events/BackgroundEventHandler.java |  2 +-
 .../consumer/internals/ApplicationEventHandlerTest.java   | 15 ++++-----------
 3 files changed, 8 insertions(+), 13 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
index dd6a1666c7b..6ab827b617c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
@@ -81,8 +81,10 @@ public class ApplicationEventHandler implements Closeable {
     public void add(final ApplicationEvent event) {
         Objects.requireNonNull(event, "ApplicationEvent provided to add must 
be non-null");
         event.setEnqueuedMs(time.milliseconds());
+        // Record the updated queue size before actually adding the event to 
the queue
+        // to avoid race conditions (the background thread is continuously 
removing from this queue)
+        
asyncConsumerMetrics.recordApplicationEventQueueSize(applicationEventQueue.size()
 + 1);
         applicationEventQueue.add(event);
-        
asyncConsumerMetrics.recordApplicationEventQueueSize(applicationEventQueue.size());
         wakeupNetworkThread();
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
index adc621d5f2e..3e83908f3df 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
@@ -53,8 +53,8 @@ public class BackgroundEventHandler {
     public void add(BackgroundEvent event) {
         Objects.requireNonNull(event, "BackgroundEvent provided to add must be 
non-null");
         event.setEnqueuedMs(time.milliseconds());
+        
asyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size() 
+ 1);
         backgroundEventQueue.add(event);
-        
asyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size());
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
index a8ce990a23d..3430719b16e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
@@ -32,8 +32,9 @@ import org.junit.jupiter.api.Test;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 public class ApplicationEventHandlerTest {
     private final Time time = new MockTime();
@@ -46,7 +47,7 @@ public class ApplicationEventHandlerTest {
     @Test
     public void testRecordApplicationEventQueueSize() {
         try (Metrics metrics = new Metrics();
-             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics);
+             AsyncConsumerMetrics asyncConsumerMetrics = spy(new 
AsyncConsumerMetrics(metrics));
              ApplicationEventHandler applicationEventHandler = new 
ApplicationEventHandler(
                      new LogContext(),
                      time,
@@ -59,15 +60,7 @@ public class ApplicationEventHandlerTest {
              )) {
             // add event
             applicationEventHandler.add(new PollEvent(time.milliseconds()));
-            assertEquals(
-                1,
-                (double) metrics.metric(
-                    metrics.metricName(
-                        
AsyncConsumerMetrics.APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME,
-                        ConsumerUtils.CONSUMER_METRIC_GROUP
-                    )
-                ).metricValue()
-            );
+            verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1);
         }
     }
 }

Reply via email to