This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 52e8730613c [fix][test] Fix quiet time implementation in
BrokerTestUtil.receiveMessages (#23876)
52e8730613c is described below
commit 52e8730613c36008ea57a0ca5c10231512232d7e
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jan 22 22:48:46 2025 -0800
[fix][test] Fix quiet time implementation in BrokerTestUtil.receiveMessages
(#23876)
---
.../org/apache/pulsar/broker/BrokerTestUtil.java | 41 +++++---
.../apache/pulsar/broker/BrokerTestUtilTest.java | 115 +++++++++++++++++++++
2 files changed, 143 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
index 8364cae53b2..e97928c4c66 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
@@ -39,10 +39,12 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import lombok.SneakyThrows;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
@@ -239,33 +241,46 @@ public class BrokerTestUtil {
public static <T> void receiveMessages(BiFunction<Consumer<T>, Message<T>,
Boolean> messageHandler,
Duration quietTimeout,
Stream<Consumer<T>> consumers) {
+ long quietTimeoutNanos = quietTimeout.toNanos();
+ AtomicLong lastMessageReceivedNanos = new
AtomicLong(System.nanoTime());
FutureUtil.waitForAll(consumers
- .map(consumer -> receiveMessagesAsync(consumer, quietTimeout,
messageHandler)).toList()).join();
+ .map(consumer -> receiveMessagesAsync(consumer,
quietTimeoutNanos, quietTimeoutNanos, messageHandler,
+ lastMessageReceivedNanos)).toList()).join();
}
// asynchronously receive messages from a consumer and handle them using
the provided message handler
// the benefit is that multiple consumers can be concurrently consumed
without the need to have multiple threads
// this is useful in tests where multiple consumers are needed to test the
functionality
- private static <T> CompletableFuture<Void>
receiveMessagesAsync(Consumer<T> consumer, Duration quietTimeout,
-
BiFunction<Consumer<T>, Message<T>, Boolean>
-
messageHandler) {
- CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
- return receiveFuture
- .orTimeout(quietTimeout.toMillis(), TimeUnit.MILLISECONDS)
+ private static <T> CompletableFuture<Void>
receiveMessagesAsync(Consumer<T> consumer,
+ long
quietTimeoutNanos,
+ long
receiveTimeoutNanos,
+
BiFunction<Consumer<T>, Message<T>, Boolean>
+
messageHandler,
+ AtomicLong
lastMessageReceivedNanos) {
+ return consumer.receiveAsync()
+ .orTimeout(receiveTimeoutNanos, TimeUnit.NANOSECONDS)
.handle((msg, t) -> {
+ long currentNanos = System.nanoTime();
if (t != null) {
if (t instanceof TimeoutException) {
- // cancel the receive future so that Pulsar client
can clean up the resources
- receiveFuture.cancel(false);
- return false;
+ long sinceLastMessageReceivedNanos = currentNanos
- lastMessageReceivedNanos.get();
+ if (sinceLastMessageReceivedNanos >
quietTimeoutNanos) {
+ return Pair.of(false, 0L);
+ } else {
+ return Pair.of(true, quietTimeoutNanos -
sinceLastMessageReceivedNanos);
+ }
} else {
throw FutureUtil.wrapToCompletionException(t);
}
}
- return messageHandler.apply(consumer, msg);
- }).thenComposeAsync(receiveMore -> {
+ lastMessageReceivedNanos.set(currentNanos);
+ return Pair.of(messageHandler.apply(consumer, msg),
quietTimeoutNanos);
+ }).thenComposeAsync(receiveMoreAndNextTimeout -> {
+ boolean receiveMore = receiveMoreAndNextTimeout.getLeft();
if (receiveMore) {
- return receiveMessagesAsync(consumer, quietTimeout,
messageHandler);
+ Long nextReceiveTimeoutNanos =
receiveMoreAndNextTimeout.getRight();
+ return receiveMessagesAsync(consumer,
quietTimeoutNanos, nextReceiveTimeoutNanos,
+ messageHandler, lastMessageReceivedNanos);
} else {
return CompletableFuture.completedFuture(null);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java
new file mode 100644
index 00000000000..90b917a319c
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class BrokerTestUtilTest {
+ @Test
+ public void testReceiveMessagesQuietTime() throws Exception {
+ // Mock consumers
+ Consumer<Integer> consumer1 = mock(Consumer.class);
+ Consumer<Integer> consumer2 = mock(Consumer.class);
+
+ long consumer1DelayMs = 300L;
+ long consumer2DelayMs = 400L;
+ long quietTimeMs = 500L;
+
+ // Define behavior for receiveAsync with delay
+ AtomicBoolean consumer1FutureContinueSupplying = new
AtomicBoolean(true);
+ when(consumer1.receiveAsync()).thenAnswer(invocation -> {
+ if (consumer1FutureContinueSupplying.get()) {
+ CompletableFuture<Message> messageCompletableFuture =
+ CompletableFuture.supplyAsync(() ->
mock(Message.class),
+
CompletableFuture.delayedExecutor(consumer1DelayMs, TimeUnit.MILLISECONDS));
+ consumer1FutureContinueSupplying.set(false);
+ // continue supplying while the future is cancelled or timed
out
+ FutureUtil.whenCancelledOrTimedOut(messageCompletableFuture,
() -> {
+ consumer1FutureContinueSupplying.set(true);
+ });
+ return messageCompletableFuture;
+ } else {
+ return new CompletableFuture<>();
+ }
+ });
+ AtomicBoolean consumer2FutureContinueSupplying = new
AtomicBoolean(true);
+ when(consumer2.receiveAsync()).thenAnswer(invocation -> {
+ if (consumer2FutureContinueSupplying.get()) {
+ CompletableFuture<Message> messageCompletableFuture =
+ CompletableFuture.supplyAsync(() ->
mock(Message.class),
+
CompletableFuture.delayedExecutor(consumer2DelayMs, TimeUnit.MILLISECONDS));
+ consumer2FutureContinueSupplying.set(false);
+ // continue supplying while the future is cancelled or timed
out
+ FutureUtil.whenCancelledOrTimedOut(messageCompletableFuture,
() -> {
+ consumer2FutureContinueSupplying.set(true);
+ });
+ return messageCompletableFuture;
+ } else {
+ return new CompletableFuture<>();
+ }
+ });
+
+ // Atomic variables to track message handling
+ AtomicInteger messageCount = new AtomicInteger(0);
+
+ // Message handler
+ BiFunction<Consumer<Integer>, Message<Integer>, Boolean>
messageHandler = (consumer, msg) -> {
+ messageCount.incrementAndGet();
+ return true;
+ };
+
+ // Track start time
+ long startTime = System.nanoTime();
+
+ // Call receiveMessages method
+ BrokerTestUtil.receiveMessages(messageHandler,
Duration.ofMillis(quietTimeMs), consumer1, consumer2);
+
+ // Track end time
+ long endTime = System.nanoTime();
+
+ // Verify that messages were attempted to be received
+ verify(consumer1, times(3)).receiveAsync();
+ verify(consumer2, times(2)).receiveAsync();
+
+ // Verify that the message handler was called
+ assertEquals(messageCount.get(), 2);
+
+ // Verify the time spent is as expected (within a reasonable margin)
+ long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime -
startTime);
+ assertThat(durationMillis).isBetween(consumer2DelayMs + quietTimeMs,
+ consumer2DelayMs + quietTimeMs + (quietTimeMs / 2));
+ }
+}
\ No newline at end of file