This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7274ee14eb0 [improve][client] RawReader support pause and resume
(#24597)
7274ee14eb0 is described below
commit 7274ee14eb09fab38d289f81b633342ccf826d34
Author: Hang Chen <[email protected]>
AuthorDate: Mon Aug 4 17:29:18 2025 -0700
[improve][client] RawReader support pause and resume (#24597)
---
.../org/apache/pulsar/client/api/RawReader.java | 15 +++
.../apache/pulsar/client/impl/RawReaderImpl.java | 10 ++
.../apache/pulsar/client/impl/RawReaderTest.java | 126 +++++++++++++++++++++
3 files changed, 151 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
index ae4927da5ce..e6065534a67 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/api/RawReader.java
@@ -101,4 +101,19 @@ public interface RawReader {
* Close the raw reader.
*/
CompletableFuture<Void> closeAsync();
+
+ /**
+ * Stop requesting new messages from the broker until {@link #resume()} is
called. Note that this might cause
+ * {@link #readNextAsync()} to block until {@link #resume()} is called and
new messages are pushed by the broker.
+ */
+ default void pause() {
+
+ }
+
+ /**
+ * Resume requesting messages from the broker.
+ */
+ default void resume() {
+
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index 32f75d71dc3..02c0fd568d8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -112,6 +112,16 @@ public class RawReaderImpl implements RawReader {
return consumer.getLastMessageIdAsync();
}
+ @Override
+ public void pause() {
+ consumer.pause();
+ }
+
+ @Override
+ public void resume() {
+ consumer.resume();
+ }
+
@Override
public String toString() {
return "RawReader(topic=" + getTopic() + ")";
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index 66475624334..bb98ca1e4e7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -22,6 +22,8 @@ import static
org.apache.pulsar.client.impl.RawReaderImpl.DEFAULT_RECEIVER_QUEUE
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
@@ -37,6 +39,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -619,4 +623,126 @@ public class RawReaderTest extends
MockedPulsarServiceBaseTest {
reader.closeAsync().get();
admin.topics().delete(topic, false);
}
+
+ @Test(timeOut = 100000)
+ public void testPauseAndResume() throws Exception {
+ log.info("-- Starting testPauseAndResume test --");
+
+ int receiverQueueSize = 20; // number of permits broker has when
consumer initially subscribes
+
+ String topic = "persistent://my-property/my-ns/my-topic-pr";
+ String subscription = "my-subscriber-name";
+
+ AtomicReference<CountDownLatch> latch = new AtomicReference<>(new
CountDownLatch(receiverQueueSize));
+ AtomicInteger received = new AtomicInteger();
+ ConsumerConfigurationData<byte[]> consumerConfiguration = new
ConsumerConfigurationData<>();
+ consumerConfiguration.getTopicNames().add(topic);
+ consumerConfiguration.setSubscriptionName(subscription);
+ consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
+ consumerConfiguration.setReceiverQueueSize(receiverQueueSize);
+ RawReader reader = RawReader.create(pulsarClient,
consumerConfiguration, true, true).get();
+
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ reader.pause();
+
+ for (int i = 0; i < receiverQueueSize * 2; i++) {
+ producer.send(("my-message-" + i).getBytes());
+ }
+
+ new Thread(() -> {
+ try {
+ while (reader.hasMessageAvailableAsync().get()) {
+ var msg = reader.readNextAsync().get();
+ received.incrementAndGet();
+ msg.getHeadersAndPayload().release();
+ latch.get().countDown();
+ log.info("Received message [{}] in the reader",
msg.getMessageId());
+ }
+ } catch (Exception e) {
+
+ }
+ }).start();
+
+ log.info("Waiting for message listener to ack " + receiverQueueSize +
" messages");
+ assertTrue(latch.get().await(receiverQueueSize, TimeUnit.SECONDS),
+ "Timed out waiting for message listener acks");
+
+ log.info("Giving message listener an opportunity to receive messages
while paused");
+ Awaitility.await().untilAsserted(
+ () -> assertEquals(received.intValue(), receiverQueueSize,
+ "Consumer received messages while paused"));
+
+ latch.set(new CountDownLatch(receiverQueueSize));
+
+ reader.resume();
+
+ log.info("Waiting for message listener to ack all messages");
+ assertTrue(latch.get().await(receiverQueueSize, TimeUnit.SECONDS),
+ "Timed out waiting for message listener acks");
+
+ reader.closeAsync();
+ producer.close();
+ log.info("-- Exiting testPauseAndResume test --");
+ }
+
+ @Test(timeOut = 30000)
+ public void testPauseAndResumeWithUnloading() throws Exception {
+ final String topicName =
"persistent://my-property/my-ns/pause-and-resume-with-unloading";
+ final String subName = "sub";
+ final int receiverQueueSize = 20;
+
+ AtomicReference<CountDownLatch> latch = new AtomicReference<>(new
CountDownLatch(receiverQueueSize));
+ AtomicInteger received = new AtomicInteger();
+
+ ConsumerConfigurationData<byte[]> consumerConfiguration = new
ConsumerConfigurationData<>();
+ consumerConfiguration.getTopicNames().add(topicName);
+ consumerConfiguration.setSubscriptionName(subName);
+ consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
+ consumerConfiguration.setReceiverQueueSize(receiverQueueSize);
+ RawReader reader = RawReader.create(pulsarClient,
consumerConfiguration, true, true).get();
+
+ reader.pause();
+
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
+
+ for (int i = 0; i < receiverQueueSize * 2; i++) {
+ producer.send(("my-message-" + i).getBytes());
+ }
+
+ new Thread(() -> {
+ try {
+ while (reader.hasMessageAvailableAsync().get()) {
+ var msg = reader.readNextAsync().get();
+ received.incrementAndGet();
+ msg.getHeadersAndPayload().release();
+ latch.get().countDown();
+ log.info("Received message [{}] in the reader",
msg.getMessageId());
+ }
+ } catch (Exception e) {
+ //
+ }
+ }).start();
+
+ // Paused consumer receives only `receiverQueueSize` messages
+ assertTrue(latch.get().await(receiverQueueSize, TimeUnit.SECONDS),
+ "Timed out waiting for message listener acks");
+
+ // Make sure no flow permits are sent when the consumer reconnects to
the topic
+ admin.topics().unload(topicName);
+ Awaitility.await().untilAsserted(
+ () -> assertEquals(received.intValue(), receiverQueueSize,
"Consumer received messages while paused"));
+
+
+ latch.set(new CountDownLatch(receiverQueueSize));
+ reader.resume();
+ assertTrue(latch.get().await(receiverQueueSize, TimeUnit.SECONDS),
+ "Timed out waiting for message listener acks");
+
+ reader.closeAsync();
+ producer.close();
+ }
}