This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0721d21a57e KAFKA-18415: Fix for event queue metric and flaky test
(#18416)
0721d21a57e is described below
commit 0721d21a57ee74c8faab8cab5ec2c06007629689
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Jan 8 14:31:10 2025 +0100
KAFKA-18415: Fix for event queue metric and flaky test (#18416)
Reviewers: Andrew Schofield <[email protected]>
---
.../internals/events/ApplicationEventHandler.java | 4 +++-
.../consumer/internals/events/BackgroundEventHandler.java | 2 +-
.../consumer/internals/ApplicationEventHandlerTest.java | 15 ++++-----------
3 files changed, 8 insertions(+), 13 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
index dd6a1666c7b..6ab827b617c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
@@ -81,8 +81,10 @@ public class ApplicationEventHandler implements Closeable {
public void add(final ApplicationEvent event) {
Objects.requireNonNull(event, "ApplicationEvent provided to add must
be non-null");
event.setEnqueuedMs(time.milliseconds());
+ // Record the updated queue size before actually adding the event to
the queue
+ // to avoid race conditions (the background thread is continuously
removing from this queue)
+
asyncConsumerMetrics.recordApplicationEventQueueSize(applicationEventQueue.size()
+ 1);
applicationEventQueue.add(event);
-
asyncConsumerMetrics.recordApplicationEventQueueSize(applicationEventQueue.size());
wakeupNetworkThread();
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
index adc621d5f2e..3e83908f3df 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
@@ -53,8 +53,8 @@ public class BackgroundEventHandler {
public void add(BackgroundEvent event) {
Objects.requireNonNull(event, "BackgroundEvent provided to add must be
non-null");
event.setEnqueuedMs(time.milliseconds());
+
asyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size()
+ 1);
backgroundEventQueue.add(event);
-
asyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size());
}
/**
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
index a8ce990a23d..3430719b16e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
@@ -32,8 +32,9 @@ import org.junit.jupiter.api.Test;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
public class ApplicationEventHandlerTest {
private final Time time = new MockTime();
@@ -46,7 +47,7 @@ public class ApplicationEventHandlerTest {
@Test
public void testRecordApplicationEventQueueSize() {
try (Metrics metrics = new Metrics();
- AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics);
+ AsyncConsumerMetrics asyncConsumerMetrics = spy(new
AsyncConsumerMetrics(metrics));
ApplicationEventHandler applicationEventHandler = new
ApplicationEventHandler(
new LogContext(),
time,
@@ -59,15 +60,7 @@ public class ApplicationEventHandlerTest {
)) {
// add event
applicationEventHandler.add(new PollEvent(time.milliseconds()));
- assertEquals(
- 1,
- (double) metrics.metric(
- metrics.metricName(
-
AsyncConsumerMetrics.APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME,
- ConsumerUtils.CONSUMER_METRIC_GROUP
- )
- ).metricValue()
- );
+ verify(asyncConsumerMetrics).recordApplicationEventQueueSize(1);
}
}
}