This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 52e8730613c [fix][test] Fix quiet time implementation in 
BrokerTestUtil.receiveMessages (#23876)
52e8730613c is described below

commit 52e8730613c36008ea57a0ca5c10231512232d7e
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Jan 22 22:48:46 2025 -0800

    [fix][test] Fix quiet time implementation in BrokerTestUtil.receiveMessages 
(#23876)
---
 .../org/apache/pulsar/broker/BrokerTestUtil.java   |  41 +++++---
 .../apache/pulsar/broker/BrokerTestUtilTest.java   | 115 +++++++++++++++++++++
 2 files changed, 143 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
index 8364cae53b2..e97928c4c66 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java
@@ -39,10 +39,12 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.stream.Stream;
 import lombok.SneakyThrows;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
@@ -239,33 +241,46 @@ public class BrokerTestUtil {
     public static <T> void receiveMessages(BiFunction<Consumer<T>, Message<T>, 
Boolean> messageHandler,
                                            Duration quietTimeout,
                                            Stream<Consumer<T>> consumers) {
+        long quietTimeoutNanos = quietTimeout.toNanos();
+        AtomicLong lastMessageReceivedNanos = new 
AtomicLong(System.nanoTime());
         FutureUtil.waitForAll(consumers
-                .map(consumer -> receiveMessagesAsync(consumer, quietTimeout, 
messageHandler)).toList()).join();
+                .map(consumer -> receiveMessagesAsync(consumer, 
quietTimeoutNanos, quietTimeoutNanos, messageHandler,
+                        lastMessageReceivedNanos)).toList()).join();
     }
 
     // asynchronously receive messages from a consumer and handle them using 
the provided message handler
     // the benefit is that multiple consumers can be concurrently consumed 
without the need to have multiple threads
     // this is useful in tests where multiple consumers are needed to test the 
functionality
-    private static <T> CompletableFuture<Void> 
receiveMessagesAsync(Consumer<T> consumer, Duration quietTimeout,
-                                                             
BiFunction<Consumer<T>, Message<T>, Boolean>
-                                                                     
messageHandler) {
-        CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
-        return receiveFuture
-                .orTimeout(quietTimeout.toMillis(), TimeUnit.MILLISECONDS)
+    private static <T> CompletableFuture<Void> 
receiveMessagesAsync(Consumer<T> consumer,
+                                                                    long 
quietTimeoutNanos,
+                                                                    long 
receiveTimeoutNanos,
+                                                                    
BiFunction<Consumer<T>, Message<T>, Boolean>
+                                                                            
messageHandler,
+                                                                    AtomicLong 
lastMessageReceivedNanos) {
+        return consumer.receiveAsync()
+                .orTimeout(receiveTimeoutNanos, TimeUnit.NANOSECONDS)
                 .handle((msg, t) -> {
+                    long currentNanos = System.nanoTime();
                     if (t != null) {
                         if (t instanceof TimeoutException) {
-                            // cancel the receive future so that Pulsar client 
can clean up the resources
-                            receiveFuture.cancel(false);
-                            return false;
+                            long sinceLastMessageReceivedNanos = currentNanos 
- lastMessageReceivedNanos.get();
+                            if (sinceLastMessageReceivedNanos > 
quietTimeoutNanos) {
+                                return Pair.of(false, 0L);
+                            } else {
+                                return Pair.of(true, quietTimeoutNanos - 
sinceLastMessageReceivedNanos);
+                            }
                         } else {
                             throw FutureUtil.wrapToCompletionException(t);
                         }
                     }
-                    return messageHandler.apply(consumer, msg);
-                }).thenComposeAsync(receiveMore -> {
+                    lastMessageReceivedNanos.set(currentNanos);
+                    return Pair.of(messageHandler.apply(consumer, msg), 
quietTimeoutNanos);
+                }).thenComposeAsync(receiveMoreAndNextTimeout -> {
+                    boolean receiveMore = receiveMoreAndNextTimeout.getLeft();
                     if (receiveMore) {
-                        return receiveMessagesAsync(consumer, quietTimeout, 
messageHandler);
+                        Long nextReceiveTimeoutNanos = 
receiveMoreAndNextTimeout.getRight();
+                        return receiveMessagesAsync(consumer, 
quietTimeoutNanos, nextReceiveTimeoutNanos,
+                                messageHandler, lastMessageReceivedNanos);
                     } else {
                         return CompletableFuture.completedFuture(null);
                     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java
new file mode 100644
index 00000000000..90b917a319c
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class BrokerTestUtilTest {
+    @Test
+    public void testReceiveMessagesQuietTime() throws Exception {
+        // Mock consumers
+        Consumer<Integer> consumer1 = mock(Consumer.class);
+        Consumer<Integer> consumer2 = mock(Consumer.class);
+
+        long consumer1DelayMs = 300L;
+        long consumer2DelayMs = 400L;
+        long quietTimeMs = 500L;
+
+        // Define behavior for receiveAsync with delay
+        AtomicBoolean consumer1FutureContinueSupplying = new 
AtomicBoolean(true);
+        when(consumer1.receiveAsync()).thenAnswer(invocation -> {
+            if (consumer1FutureContinueSupplying.get()) {
+                CompletableFuture<Message> messageCompletableFuture =
+                        CompletableFuture.supplyAsync(() -> 
mock(Message.class),
+                                
CompletableFuture.delayedExecutor(consumer1DelayMs, TimeUnit.MILLISECONDS));
+                consumer1FutureContinueSupplying.set(false);
+                // continue supplying while the future is cancelled or timed 
out
+                FutureUtil.whenCancelledOrTimedOut(messageCompletableFuture, 
() -> {
+                    consumer1FutureContinueSupplying.set(true);
+                });
+                return messageCompletableFuture;
+            } else {
+                return new CompletableFuture<>();
+            }
+        });
+        AtomicBoolean consumer2FutureContinueSupplying = new 
AtomicBoolean(true);
+        when(consumer2.receiveAsync()).thenAnswer(invocation -> {
+            if (consumer2FutureContinueSupplying.get()) {
+                CompletableFuture<Message> messageCompletableFuture =
+                        CompletableFuture.supplyAsync(() -> 
mock(Message.class),
+                                
CompletableFuture.delayedExecutor(consumer2DelayMs, TimeUnit.MILLISECONDS));
+                consumer2FutureContinueSupplying.set(false);
+                // continue supplying while the future is cancelled or timed 
out
+                FutureUtil.whenCancelledOrTimedOut(messageCompletableFuture, 
() -> {
+                    consumer2FutureContinueSupplying.set(true);
+                });
+                return messageCompletableFuture;
+            } else {
+                return new CompletableFuture<>();
+            }
+        });
+
+        // Atomic variables to track message handling
+        AtomicInteger messageCount = new AtomicInteger(0);
+
+        // Message handler
+        BiFunction<Consumer<Integer>, Message<Integer>, Boolean> 
messageHandler = (consumer, msg) -> {
+            messageCount.incrementAndGet();
+            return true;
+        };
+
+        // Track start time
+        long startTime = System.nanoTime();
+
+        // Call receiveMessages method
+        BrokerTestUtil.receiveMessages(messageHandler, 
Duration.ofMillis(quietTimeMs), consumer1, consumer2);
+
+        // Track end time
+        long endTime = System.nanoTime();
+
+        // Verify that messages were attempted to be received
+        verify(consumer1, times(3)).receiveAsync();
+        verify(consumer2, times(2)).receiveAsync();
+
+        // Verify that the message handler was called
+        assertEquals(messageCount.get(), 2);
+
+        // Verify the time spent is as expected (within a reasonable margin)
+        long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime - 
startTime);
+        assertThat(durationMillis).isBetween(consumer2DelayMs + quietTimeMs,
+                consumer2DelayMs + quietTimeMs + (quietTimeMs / 2));
+    }
+}
\ No newline at end of file

Reply via email to