This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d8e6b7  PIP 83 : Pulsar Reader: Message consumption with pooled 
buffer (#11725)
8d8e6b7 is described below

commit 8d8e6b751ee0dc99306a1c61c22a8d75b5927811
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Sat Aug 21 00:50:17 2021 -0700

    PIP 83 : Pulsar Reader: Message consumption with pooled buffer (#11725)
    
    * PIP 83 : Pulsar Reader: Message consumption with pooled buffer
---
 .../client/impl/BrokerClientIntegrationTest.java   | 56 ++++++++++++++++++++++
 .../apache/pulsar/client/api/ReaderBuilder.java    | 10 ++++
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  |  1 +
 .../pulsar/client/impl/ReaderBuilderImpl.java      |  7 ++-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |  1 +
 .../client/impl/conf/ReaderConfigurationData.java  |  2 +
 6 files changed, 76 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index fb3c30b..a111dd8 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -81,6 +81,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
@@ -953,4 +954,59 @@ public class BrokerClientIntegrationTest extends 
ProducerConsumerBase {
         consumer.close();
         producer.close();
     }
+
+    /**
+     * It validates pooled message consumption for batch and non-batch 
messages.
+     * 
+     * @throws Exception
+     */
+    @Test(dataProvider = "booleanFlagProvider")
+    public void testConsumerWithPooledMessagesWithReader(boolean 
isBatchingEnabled) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        @Cleanup
+        PulsarClient newPulsarClient = 
PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
+
+        final String topic = 
"persistent://my-property/my-ns/testConsumerWithPooledMessages" + 
isBatchingEnabled;
+
+        @Cleanup
+        Reader<ByteBuffer> reader = 
newPulsarClient.newReader(Schema.BYTEBUFFER).topic(topic).poolMessages(true)
+                .startMessageId(MessageId.latest).create();
+
+        @Cleanup
+        Producer<byte[]> producer = 
newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled).create();
+
+        final int numMessages = 100;
+        for (int i = 0; i < numMessages; i++) {
+            producer.newMessage().value(("value-" + i).getBytes(UTF_8))
+            .eventTime((i + 1) * 100L).sendAsync();
+        }
+        producer.flush();
+
+        // Reuse pre-allocated pooled buffer to process every message
+        byte[] val = null;
+        int size = 0;
+        for (int i = 0; i < numMessages; i++) {
+            Message<ByteBuffer> msg = reader.readNext();
+            ByteBuffer value;
+            try {
+                value = msg.getValue();
+                int capacity = value.remaining();
+                // expand the size of buffer if needed
+                if (capacity > size) {
+                    val = new byte[capacity];
+                    size = capacity;
+                }
+                // read message into pooled buffer
+                value.get(val, 0, capacity);
+                // process the message
+                assertEquals(("value-" + i), new String(val, 0, capacity));
+                assertTrue(value.isDirect());
+            } finally {
+                msg.release();
+            }
+        }
+        reader.close();
+        producer.close();
+    }
 }
diff --git 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
index a84208b..4186df7 100644
--- 
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
+++ 
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java
@@ -280,4 +280,14 @@ public interface ReaderBuilder<T> extends Cloneable {
      * @return the reader builder instance
      */
     ReaderBuilder<T> keyHashRange(Range... ranges);
+
+    /**
+     * Enable pooling of messages and the underlying data buffers.
+     * <p/>
+     * When pooling is enabled, the application is responsible for calling 
Message.release() after the handling of every
+     * received message. If “release()” is not called on a received message, 
there will be a memory leak. If an
+     * application attempts to use and already “released” message, it might 
experience undefined behavior (for example, memory
+     * corruption, deserialization error, etc.).
+     */
+    ReaderBuilder<T> poolMessages(boolean poolMessages);
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index 32c9869..fab61b2 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -64,6 +64,7 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
         consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable);
         
consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
         
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
+        
consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages());
 
         if (readerConfiguration.getReaderListener() != null) {
             ReaderListener<T> readerListener = 
readerConfiguration.getReaderListener();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index b8f017f..4305bb6 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -40,7 +40,6 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.ReaderListener;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.DefaultCryptoKeyReader;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -223,4 +222,10 @@ public class ReaderBuilderImpl<T> implements 
ReaderBuilder<T> {
         conf.setKeyHashRanges(Arrays.asList(ranges));
         return this;
     }
+
+    @Override
+    public ReaderBuilder<T> poolMessages(boolean poolMessages) {
+        conf.setPoolMessages(poolMessages);
+        return this;
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 5b86864..37d346e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -69,6 +69,7 @@ public class ReaderImpl<T> implements Reader<T> {
         consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable);
         
consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
         
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
+        
consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages());
 
         // Reader doesn't need any batch receiving behaviours
         // disable the batch receive timer for the ConsumerImpl instance 
wrapped by the ReaderImpl
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
index 14770ad..47efa86 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -61,6 +61,8 @@ public class ReaderConfigurationData<T> implements 
Serializable, Cloneable {
 
     private transient List<Range> keyHashRanges;
 
+    private boolean poolMessages = false;
+
     @JsonIgnore
     public String getTopicName() {
         if (topicNames.size() > 1) {

Reply via email to