This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7274ee14eb0 [improve][client] RawReader support pause and resume 
(#24597)
7274ee14eb0 is described below

commit 7274ee14eb09fab38d289f81b633342ccf826d34
Author: Hang Chen <[email protected]>
AuthorDate: Mon Aug 4 17:29:18 2025 -0700

    [improve][client] RawReader support pause and resume (#24597)
---
 .../org/apache/pulsar/client/api/RawReader.java    |  15 +++
 .../apache/pulsar/client/impl/RawReaderImpl.java   |  10 ++
 .../apache/pulsar/client/impl/RawReaderTest.java   | 126 +++++++++++++++++++++
 3 files changed, 151 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index ae4927da5ce..e6065534a67 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -101,4 +101,19 @@ public interface RawReader {
      * Close the raw reader.
      */
     CompletableFuture<Void> closeAsync();
+
+    /**
+     * Stop requesting new messages from the broker until {@link #resume()} is 
called. Note that this might cause
+     * {@link #readNextAsync()} to block until {@link #resume()} is called and 
new messages are pushed by the broker.
+     */
+    default void pause() {
+
+    }
+
+    /**
+     * Resume requesting messages from the broker.
+     */
+    default void resume() {
+
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 32f75d71dc3..02c0fd568d8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -112,6 +112,16 @@ public class RawReaderImpl implements RawReader {
         return consumer.getLastMessageIdAsync();
     }
 
+    @Override
+    public void pause() {
+        consumer.pause();
+    }
+
+    @Override
+    public void resume() {
+        consumer.resume();
+    }
+
     @Override
     public String toString() {
         return "RawReader(topic=" + getTopic() + ")";
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index 66475624334..bb98ca1e4e7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -22,6 +22,8 @@ import static 
org.apache.pulsar.client.impl.RawReaderImpl.DEFAULT_RECEIVER_QUEUE
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Sets;
 import io.netty.buffer.ByteBuf;
 import java.util.ArrayList;
@@ -37,6 +39,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -619,4 +623,126 @@ public class RawReaderTest extends 
MockedPulsarServiceBaseTest {
         reader.closeAsync().get();
         admin.topics().delete(topic, false);
     }
+
+    @Test(timeOut = 100000)
+    public void testPauseAndResume() throws Exception {
+        log.info("-- Starting testPauseAndResume test --");
+
+        int receiverQueueSize = 20;     // number of permits broker has when 
consumer initially subscribes
+
+        String topic = "persistent://my-property/my-ns/my-topic-pr";
+        String subscription = "my-subscriber-name";
+
+        AtomicReference<CountDownLatch> latch = new AtomicReference<>(new 
CountDownLatch(receiverQueueSize));
+        AtomicInteger received = new AtomicInteger();
+        ConsumerConfigurationData<byte[]> consumerConfiguration = new 
ConsumerConfigurationData<>();
+        consumerConfiguration.getTopicNames().add(topic);
+        consumerConfiguration.setSubscriptionName(subscription);
+        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
+        consumerConfiguration.setReceiverQueueSize(receiverQueueSize);
+        RawReader reader = RawReader.create(pulsarClient, 
consumerConfiguration, true, true).get();
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .create();
+
+        reader.pause();
+
+        for (int i = 0; i < receiverQueueSize * 2; i++) {
+            producer.send(("my-message-" + i).getBytes());
+        }
+
+        new Thread(() -> {
+            try {
+                while (reader.hasMessageAvailableAsync().get()) {
+                    var msg = reader.readNextAsync().get();
+                    received.incrementAndGet();
+                    msg.getHeadersAndPayload().release();
+                    latch.get().countDown();
+                    log.info("Received message [{}] in the reader", 
msg.getMessageId());
+                }
+            } catch (Exception e) {
+
+            }
+        }).start();
+
+        log.info("Waiting for message listener to ack " + receiverQueueSize + 
" messages");
+        assertTrue(latch.get().await(receiverQueueSize, TimeUnit.SECONDS),
+                "Timed out waiting for message listener acks");
+
+        log.info("Giving message listener an opportunity to receive messages 
while paused");
+        Awaitility.await().untilAsserted(
+                () -> assertEquals(received.intValue(), receiverQueueSize,
+                        "Consumer received messages while paused"));
+
+        latch.set(new CountDownLatch(receiverQueueSize));
+
+        reader.resume();
+
+        log.info("Waiting for message listener to ack all messages");
+        assertTrue(latch.get().await(receiverQueueSize, TimeUnit.SECONDS),
+                "Timed out waiting for message listener acks");
+
+        reader.closeAsync();
+        producer.close();
+        log.info("-- Exiting testPauseAndResume test --");
+    }
+
+    @Test(timeOut = 30000)
+    public void testPauseAndResumeWithUnloading() throws Exception {
+        final String topicName = 
"persistent://my-property/my-ns/pause-and-resume-with-unloading";
+        final String subName = "sub";
+        final int receiverQueueSize = 20;
+
+        AtomicReference<CountDownLatch> latch = new AtomicReference<>(new 
CountDownLatch(receiverQueueSize));
+        AtomicInteger received = new AtomicInteger();
+
+        ConsumerConfigurationData<byte[]> consumerConfiguration = new 
ConsumerConfigurationData<>();
+        consumerConfiguration.getTopicNames().add(topicName);
+        consumerConfiguration.setSubscriptionName(subName);
+        consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
+        consumerConfiguration.setReceiverQueueSize(receiverQueueSize);
+        RawReader reader = RawReader.create(pulsarClient, 
consumerConfiguration, true, true).get();
+
+        reader.pause();
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
+
+        for (int i = 0; i < receiverQueueSize * 2; i++) {
+            producer.send(("my-message-" + i).getBytes());
+        }
+
+        new Thread(() -> {
+            try {
+                while (reader.hasMessageAvailableAsync().get()) {
+                    var msg = reader.readNextAsync().get();
+                    received.incrementAndGet();
+                    msg.getHeadersAndPayload().release();
+                    latch.get().countDown();
+                    log.info("Received message [{}] in the reader", 
msg.getMessageId());
+                }
+            } catch (Exception e) {
+                //
+            }
+        }).start();
+
+        // Paused consumer receives only `receiverQueueSize` messages
+        assertTrue(latch.get().await(receiverQueueSize, TimeUnit.SECONDS),
+                "Timed out waiting for message listener acks");
+
+        // Make sure no flow permits are sent when the consumer reconnects to 
the topic
+        admin.topics().unload(topicName);
+        Awaitility.await().untilAsserted(
+                () -> assertEquals(received.intValue(), receiverQueueSize, 
"Consumer received messages while paused"));
+
+
+        latch.set(new CountDownLatch(receiverQueueSize));
+        reader.resume();
+        assertTrue(latch.get().await(receiverQueueSize, TimeUnit.SECONDS),
+                "Timed out waiting for message listener acks");
+
+        reader.closeAsync();
+        producer.close();
+    }
 }

Reply via email to