This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new d7bc43ed06b KAFKA-16664; Re-add EventAccumulator.poll(long, TimeUnit) (#16144) d7bc43ed06b is described below commit d7bc43ed06bbdb0864863541df51544dbe234e5d Author: Jeff Kim <kimkb2...@gmail.com> AuthorDate: Tue Jun 4 02:27:35 2024 -0400 KAFKA-16664; Re-add EventAccumulator.poll(long, TimeUnit) (#16144) We have revamped the thread idle ratio metric in https://github.com/apache/kafka/pull/15835. https://github.com/apache/kafka/pull/15835#discussion_r1588068337 describes a case where the metric loses accuracy and in order to set a lower bound to the accuracy, this patch re-adds a poll with a timeout that was removed as part of https://github.com/apache/kafka/pull/15430. Reviewers: David Jacot <dja...@confluent.io> --- .../group/runtime/EventAccumulator.java | 35 ++++++++-------------- .../group/runtime/MultiThreadedEventProcessor.java | 8 ++++- .../group/runtime/EventAccumulatorTest.java | 18 +++++++---- .../runtime/MultiThreadedEventProcessorTest.java | 4 +-- 4 files changed, 33 insertions(+), 32 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java index 2c22232c47a..cc1ab69cd7e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/EventAccumulator.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -162,43 +163,31 @@ public class EventAccumulator<K, T extends EventAccumulator.Event<K>> implements } /** - * Returns the next {{@link Event}} available or null if no event is - * available. + * Immediately returns the next {{@link Event}} available or null + * if the accumulator is empty. * * @return The next event available or null. */ public T poll() { - lock.lock(); - try { - K key = randomKey(); - if (key == null) return null; - - Deque<T> queue = queues.get(key); - T event = queue.poll(); - - if (queue.isEmpty()) queues.remove(key); - inflightKeys.add(key); - size--; - - return event; - } finally { - lock.unlock(); - } + return poll(0, TimeUnit.MILLISECONDS); } /** - * Returns the next {{@link Event}} available. This method blocks until an - * event is available or accumulator is closed. + * Returns the next {{@link Event}} available. This method blocks for the provided + * time and returns null of no event is available. * + * @param timeout The timeout. + * @param unit The timeout unit. * @return The next event available or null. */ - public T take() { + public T poll(long timeout, TimeUnit unit) { lock.lock(); try { K key = randomKey(); - while (key == null && !closed) { + long nanos = unit.toNanos(timeout); + while (key == null && !closed && nanos > 0) { try { - condition.await(); + nanos = condition.awaitNanos(nanos); } catch (InterruptedException e) { // Ignore. } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java index 31fa52ea7d1..6265334872a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import java.util.List; import java.util.Objects; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -34,6 +35,11 @@ import java.util.stream.IntStream; */ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { + /** + * The poll timeout to wait for an event by the EventProcessorThread. + */ + private static final long POLL_TIMEOUT_MS = 300L; + /** * The logger. */ @@ -129,7 +135,7 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor { // time should be discounted by # threads. long idleStartTimeMs = time.milliseconds(); - CoordinatorEvent event = accumulator.take(); + CoordinatorEvent event = accumulator.poll(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS); long idleEndTimeMs = time.milliseconds(); long idleTimeMs = idleEndTimeMs - idleStartTimeMs; metrics.recordThreadIdleTime(idleTimeMs / threads.size()); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java index faac0f46f6a..602614414a4 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/EventAccumulatorTest.java @@ -190,9 +190,12 @@ public class EventAccumulatorTest { MockEvent event1 = new MockEvent(1, 1); MockEvent event2 = new MockEvent(1, 2); - CompletableFuture<MockEvent> future0 = CompletableFuture.supplyAsync(accumulator::take); - CompletableFuture<MockEvent> future1 = CompletableFuture.supplyAsync(accumulator::take); - CompletableFuture<MockEvent> future2 = CompletableFuture.supplyAsync(accumulator::take); + CompletableFuture<MockEvent> future0 = CompletableFuture.supplyAsync(() -> + accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS)); + CompletableFuture<MockEvent> future1 = CompletableFuture.supplyAsync(() -> + accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS)); + CompletableFuture<MockEvent> future2 = CompletableFuture.supplyAsync(() -> + accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS)); List<CompletableFuture<MockEvent>> futures = Arrays.asList(future0, future1, future2); assertFalse(future0.isDone()); @@ -245,9 +248,12 @@ public class EventAccumulatorTest { public void testCloseUnblockWaitingThreads() throws ExecutionException, InterruptedException, TimeoutException { EventAccumulator<Integer, MockEvent> accumulator = new EventAccumulator<>(); - CompletableFuture<MockEvent> future0 = CompletableFuture.supplyAsync(accumulator::take); - CompletableFuture<MockEvent> future1 = CompletableFuture.supplyAsync(accumulator::take); - CompletableFuture<MockEvent> future2 = CompletableFuture.supplyAsync(accumulator::take); + CompletableFuture<MockEvent> future0 = CompletableFuture.supplyAsync(() -> + accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS)); + CompletableFuture<MockEvent> future1 = CompletableFuture.supplyAsync(() -> + accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS)); + CompletableFuture<MockEvent> future2 = CompletableFuture.supplyAsync(() -> + accumulator.poll(Long.MAX_VALUE, TimeUnit.MILLISECONDS)); assertFalse(future0.isDone()); assertFalse(future1.isDone()); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java index 0f2801daec3..0b8f04fe347 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessorTest.java @@ -60,8 +60,8 @@ public class MultiThreadedEventProcessorTest { } @Override - public CoordinatorEvent take() { - CoordinatorEvent event = super.take(); + public CoordinatorEvent poll(long timeout, TimeUnit unit) { + CoordinatorEvent event = super.poll(timeout, unit); time.sleep(takeDelayMs); return event; }