This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d7bc43ed06b KAFKA-16664; Re-add EventAccumulator.poll(long, TimeUnit) 
(#16144)
d7bc43ed06b is described below

commit d7bc43ed06bbdb0864863541df51544dbe234e5d
Author: Jeff Kim <kimkb2...@gmail.com>
AuthorDate: Tue Jun 4 02:27:35 2024 -0400

    KAFKA-16664; Re-add EventAccumulator.poll(long, TimeUnit) (#16144)
    
    We have revamped the thread idle ratio metric in 
https://github.com/apache/kafka/pull/15835. 
https://github.com/apache/kafka/pull/15835#discussion_r1588068337 describes a 
case where the metric loses accuracy and in order to set a lower bound to the 
accuracy, this patch re-adds a poll with a timeout that was removed as part of 
https://github.com/apache/kafka/pull/15430.
    
    Reviewers: David Jacot <dja...@confluent.io>
---
 .../group/runtime/EventAccumulator.java            | 35 ++++++++--------------
 .../group/runtime/MultiThreadedEventProcessor.java |  8 ++++-
 .../group/runtime/EventAccumulatorTest.java        | 18 +++++++----
 .../runtime/MultiThreadedEventProcessorTest.java   |  4 +--
 4 files changed, 33 insertions(+), 32 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
index 2c22232c47a..cc1ab69cd7e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -162,43 +163,31 @@ public class EventAccumulator<K, T extends 
EventAccumulator.Event<K>> implements
     }
 
     /**
-     * Returns the next {{@link Event}} available or null if no event is
-     * available.
+     * Immediately returns the next {{@link Event}} available or null
+     * if the accumulator is empty.
      *
      * @return The next event available or null.
      */
     public T poll() {
-        lock.lock();
-        try {
-            K key = randomKey();
-            if (key == null) return null;
-
-            Deque<T> queue = queues.get(key);
-            T event = queue.poll();
-
-            if (queue.isEmpty()) queues.remove(key);
-            inflightKeys.add(key);
-            size--;
-
-            return event;
-        } finally {
-            lock.unlock();
-        }
+        return poll(0, TimeUnit.MILLISECONDS);
     }
 
     /**
-     * Returns the next {{@link Event}} available. This method blocks until an
-     * event is available or accumulator is closed.
+     * Returns the next {{@link Event}} available. This method blocks for the 
provided
+     * time and returns null of no event is available.
      *
+     * @param timeout   The timeout.
+     * @param unit      The timeout unit.
      * @return The next event available or null.
      */
-    public T take() {
+    public T poll(long timeout, TimeUnit unit) {
         lock.lock();
         try {
             K key = randomKey();
-            while (key == null && !closed) {
+            long nanos = unit.toNanos(timeout);
+            while (key == null && !closed && nanos > 0) {
                 try {
-                    condition.await();
+                    nanos = condition.awaitNanos(nanos);
                 } catch (InterruptedException e) {
                     // Ignore.
                 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
index 31fa52ea7d1..6265334872a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java
@@ -25,6 +25,7 @@ import org.slf4j.Logger;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -34,6 +35,11 @@ import java.util.stream.IntStream;
  */
 public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
 
+    /**
+     * The poll timeout to wait for an event by the EventProcessorThread.
+     */
+    private static final long POLL_TIMEOUT_MS = 300L;
+
     /**
      * The logger.
      */
@@ -129,7 +135,7 @@ public class MultiThreadedEventProcessor implements 
CoordinatorEventProcessor {
                 // time should be discounted by # threads.
 
                 long idleStartTimeMs = time.milliseconds();
-                CoordinatorEvent event = accumulator.take();
+                CoordinatorEvent event = accumulator.poll(POLL_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
                 long idleEndTimeMs = time.milliseconds();
                 long idleTimeMs = idleEndTimeMs - idleStartTimeMs;
                 metrics.recordThreadIdleTime(idleTimeMs / threads.size());
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
index faac0f46f6a..602614414a4 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java
@@ -190,9 +190,12 @@ public class EventAccumulatorTest {
         MockEvent event1 = new MockEvent(1, 1);
         MockEvent event2 = new MockEvent(1, 2);
 
-        CompletableFuture<MockEvent> future0 = 
CompletableFuture.supplyAsync(accumulator::take);
-        CompletableFuture<MockEvent> future1 = 
CompletableFuture.supplyAsync(accumulator::take);
-        CompletableFuture<MockEvent> future2 = 
CompletableFuture.supplyAsync(accumulator::take);
+        CompletableFuture<MockEvent> future0 = 
CompletableFuture.supplyAsync(() ->
+            accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
+        CompletableFuture<MockEvent> future1 = 
CompletableFuture.supplyAsync(() ->
+            accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
+        CompletableFuture<MockEvent> future2 = 
CompletableFuture.supplyAsync(() ->
+            accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
         List<CompletableFuture<MockEvent>> futures = Arrays.asList(future0, 
future1, future2);
 
         assertFalse(future0.isDone());
@@ -245,9 +248,12 @@ public class EventAccumulatorTest {
     public void testCloseUnblockWaitingThreads() throws ExecutionException, 
InterruptedException, TimeoutException {
         EventAccumulator<Integer, MockEvent> accumulator = new 
EventAccumulator<>();
 
-        CompletableFuture<MockEvent> future0 = 
CompletableFuture.supplyAsync(accumulator::take);
-        CompletableFuture<MockEvent> future1 = 
CompletableFuture.supplyAsync(accumulator::take);
-        CompletableFuture<MockEvent> future2 = 
CompletableFuture.supplyAsync(accumulator::take);
+        CompletableFuture<MockEvent> future0 = 
CompletableFuture.supplyAsync(() ->
+            accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
+        CompletableFuture<MockEvent> future1 = 
CompletableFuture.supplyAsync(() ->
+            accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
+        CompletableFuture<MockEvent> future2 = 
CompletableFuture.supplyAsync(() ->
+            accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS));
 
         assertFalse(future0.isDone());
         assertFalse(future1.isDone());
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
index 0f2801daec3..0b8f04fe347 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java
@@ -60,8 +60,8 @@ public class MultiThreadedEventProcessorTest {
         }
 
         @Override
-        public CoordinatorEvent take() {
-            CoordinatorEvent event = super.take();
+        public CoordinatorEvent poll(long timeout, TimeUnit unit) {
+            CoordinatorEvent event = super.poll(timeout, unit);
             time.sleep(takeDelayMs);
             return event;
         }

Reply via email to