This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 8d8e6b7 PIP 83 : Pulsar Reader: Message consumption with pooled buffer (#11725) 8d8e6b7 is described below commit 8d8e6b751ee0dc99306a1c61c22a8d75b5927811 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Sat Aug 21 00:50:17 2021 -0700 PIP 83 : Pulsar Reader: Message consumption with pooled buffer (#11725) * PIP 83 : Pulsar Reader: Message consumption with pooled buffer --- .../client/impl/BrokerClientIntegrationTest.java | 56 ++++++++++++++++++++++ .../apache/pulsar/client/api/ReaderBuilder.java | 10 ++++ .../pulsar/client/impl/MultiTopicsReaderImpl.java | 1 + .../pulsar/client/impl/ReaderBuilderImpl.java | 7 ++- .../org/apache/pulsar/client/impl/ReaderImpl.java | 1 + .../client/impl/conf/ReaderConfigurationData.java | 2 + 6 files changed, 76 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index fb3c30b..a111dd8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -81,6 +81,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.schema.SchemaDefinition; @@ -953,4 +954,59 @@ public class BrokerClientIntegrationTest extends ProducerConsumerBase { consumer.close(); producer.close(); } + + /** + * It validates pooled message consumption for batch and non-batch messages. + * + * @throws Exception + */ + @Test(dataProvider = "booleanFlagProvider") + public void testConsumerWithPooledMessagesWithReader(boolean isBatchingEnabled) throws Exception { + log.info("-- Starting {} test --", methodName); + + @Cleanup + PulsarClient newPulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build(); + + final String topic = "persistent://my-property/my-ns/testConsumerWithPooledMessages" + isBatchingEnabled; + + @Cleanup + Reader<ByteBuffer> reader = newPulsarClient.newReader(Schema.BYTEBUFFER).topic(topic).poolMessages(true) + .startMessageId(MessageId.latest).create(); + + @Cleanup + Producer<byte[]> producer = newPulsarClient.newProducer().topic(topic).enableBatching(isBatchingEnabled).create(); + + final int numMessages = 100; + for (int i = 0; i < numMessages; i++) { + producer.newMessage().value(("value-" + i).getBytes(UTF_8)) + .eventTime((i + 1) * 100L).sendAsync(); + } + producer.flush(); + + // Reuse pre-allocated pooled buffer to process every message + byte[] val = null; + int size = 0; + for (int i = 0; i < numMessages; i++) { + Message<ByteBuffer> msg = reader.readNext(); + ByteBuffer value; + try { + value = msg.getValue(); + int capacity = value.remaining(); + // expand the size of buffer if needed + if (capacity > size) { + val = new byte[capacity]; + size = capacity; + } + // read message into pooled buffer + value.get(val, 0, capacity); + // process the message + assertEquals(("value-" + i), new String(val, 0, capacity)); + assertTrue(value.isDirect()); + } finally { + msg.release(); + } + } + reader.close(); + producer.close(); + } } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java index a84208b..4186df7 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java @@ -280,4 +280,14 @@ public interface ReaderBuilder<T> extends Cloneable { * @return the reader builder instance */ ReaderBuilder<T> keyHashRange(Range... ranges); + + /** + * Enable pooling of messages and the underlying data buffers. + * <p/> + * When pooling is enabled, the application is responsible for calling Message.release() after the handling of every + * received message. If “release()” is not called on a received message, there will be a memory leak. If an + * application attempts to use and already “released” message, it might experience undefined behavior (for example, memory + * corruption, deserialization error, etc.). + */ + ReaderBuilder<T> poolMessages(boolean poolMessages); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java index 32c9869..fab61b2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java @@ -64,6 +64,7 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> { consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable); consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize()); consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted()); + consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages()); if (readerConfiguration.getReaderListener() != null) { ReaderListener<T> readerListener = readerConfiguration.getReaderListener(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index b8f017f..4305bb6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -40,7 +40,6 @@ import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.ReaderListener; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.DefaultCryptoKeyReader; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; import org.apache.pulsar.common.util.FutureUtil; @@ -223,4 +222,10 @@ public class ReaderBuilderImpl<T> implements ReaderBuilder<T> { conf.setKeyHashRanges(Arrays.asList(ranges)); return this; } + + @Override + public ReaderBuilder<T> poolMessages(boolean poolMessages) { + conf.setPoolMessages(poolMessages); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java index 5b86864..37d346e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java @@ -69,6 +69,7 @@ public class ReaderImpl<T> implements Reader<T> { consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable); consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize()); consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted()); + consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages()); // Reader doesn't need any batch receiving behaviours // disable the batch receive timer for the ConsumerImpl instance wrapped by the ReaderImpl diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java index 14770ad..47efa86 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java @@ -61,6 +61,8 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable { private transient List<Range> keyHashRanges; + private boolean poolMessages = false; + @JsonIgnore public String getTopicName() { if (topicNames.size() > 1) {