This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new c5c177a  [feat] Support consumer batch receive. (#33)
c5c177a is described below

commit c5c177af647997d1182f3d4a69c74824f39af38f
Author: Baodi Shi <[email protected]>
AuthorDate: Wed Nov 16 20:00:21 2022 +0800

    [feat] Support consumer batch receive. (#33)
---
 pulsar/__init__.py   | 50 ++++++++++++++++++++++++++++++++++++++++++++++++--
 src/config.cc        |  7 +++++++
 src/consumer.cc      | 10 ++++++++++
 tests/pulsar_test.py | 22 ++++++++++++++++++++++
 4 files changed, 87 insertions(+), 2 deletions(-)

diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index c1195de..0007e61 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -46,7 +46,7 @@ import logging
 import _pulsar
 
 from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, 
PartitionsRoutingMode, BatchingType, \
-    LoggerLevel # noqa: F401
+    LoggerLevel, BatchReceivePolicy  # noqa: F401
 
 from pulsar.exceptions import *
 
@@ -657,7 +657,8 @@ class Client:
                   replicate_subscription_state_enabled=False,
                   max_pending_chunked_message=10,
                   auto_ack_oldest_chunked_message_on_queue_full=False,
-                  start_message_id_inclusive=False
+                  start_message_id_inclusive=False,
+                  batch_receive_policy=None
                   ):
         """
         Subscribe to the given topic and subscription combination.
@@ -740,6 +741,8 @@ class Client:
           if autoAckOldestChunkedMessageOnQueueFull is true else it marks them 
for redelivery.
         start_message_id_inclusive: bool, default=False
           Set the consumer to include the given position of any reset 
operation like Consumer::seek.
+        batch_receive_policy: class ConsumerBatchReceivePolicy
+          Set the batch collection policy for batch receiving.
         """
         _check_type(str, subscription_name, 'subscription_name')
         _check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -759,6 +762,7 @@ class Client:
         _check_type(int, max_pending_chunked_message, 
'max_pending_chunked_message')
         _check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 
'auto_ack_oldest_chunked_message_on_queue_full')
         _check_type(bool, start_message_id_inclusive, 
'start_message_id_inclusive')
+        _check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 
'batch_receive_policy')
 
         conf = _pulsar.ConsumerConfiguration()
         conf.consumer_type(consumer_type)
@@ -788,6 +792,8 @@ class Client:
         conf.max_pending_chunked_message(max_pending_chunked_message)
         
conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full)
         conf.start_message_id_inclusive(start_message_id_inclusive)
+        if batch_receive_policy:
+            conf.batch_receive_policy(batch_receive_policy.policy())
 
         c = Consumer()
         if isinstance(topic, str):
@@ -1237,6 +1243,20 @@ class Consumer:
         m._schema = self._schema
         return m
 
+    def batch_receive(self):
+        """
+        Batch receiving messages.
+
+        This calls blocks until has enough messages or wait timeout, more 
details to see {@link BatchReceivePolicy}.
+        """
+        messages = []
+        msgs = self._consumer.batch_receive()
+        for msg in msgs:
+            m = Message()
+            m._message = msg
+            messages.append(m)
+        return messages
+
     def acknowledge(self, message):
         """
         Acknowledge the reception of a single message.
@@ -1354,6 +1374,32 @@ class Consumer:
         """
         return self._consumer.get_last_message_id()
 
+class ConsumerBatchReceivePolicy:
+    """
+    Batch receive policy can limit the number and bytes of messages in a 
single batch,
+    and can specify a timeout for waiting for enough messages for this batch.
+
+    A batch receive action is completed as long as any one of the conditions 
(the batch has enough number
+    or size of messages, or the waiting timeout is passed) are met.
+    """
+    def __init__(self, max_num_message, max_num_bytes, timeout_ms):
+        """
+        Wrapper BatchReceivePolicy.
+
+        Parameters
+        ----------
+
+        max_num_message: Max num message, if less than 0, it means no limit. 
default: -1
+        max_num_bytes: Max num bytes, if less than 0, it means no limit. 
default: 10 * 1024 * 1024
+        timeout_ms: If less than 0, it means no limit. default: 100
+        """
+        self._policy = BatchReceivePolicy(max_num_message, max_num_bytes, 
timeout_ms)
+
+    def policy(self):
+        """
+        Returns the actual one BatchReceivePolicy.
+        """
+        return self._policy
 
 class Reader:
     """
diff --git a/src/config.cc b/src/config.cc
index d2ed103..f2b7187 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -253,6 +253,11 @@ void export_config() {
         .def("encryption_key", &ProducerConfiguration::addEncryptionKey, 
return_self<>())
         .def("crypto_key_reader", &ProducerConfiguration_setCryptoKeyReader, 
return_self<>());
 
+    class_<BatchReceivePolicy>("BatchReceivePolicy", init<int, int, long>())
+        .def("getTimeoutMs", &BatchReceivePolicy::getTimeoutMs)
+        .def("getMaxNumMessages", &BatchReceivePolicy::getMaxNumMessages)
+        .def("getMaxNumBytes", &BatchReceivePolicy::getMaxNumBytes);
+
     class_<ConsumerConfiguration>("ConsumerConfiguration")
         .def("consumer_type", &ConsumerConfiguration::getConsumerType)
         .def("consumer_type", &ConsumerConfiguration::setConsumerType, 
return_self<>())
@@ -267,6 +272,8 @@ void export_config() {
              
&ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions)
         .def("consumer_name", &ConsumerConfiguration::getConsumerName,
              return_value_policy<copy_const_reference>())
+        .def("batch_receive_policy", 
&ConsumerConfiguration::getBatchReceivePolicy, 
return_value_policy<copy_const_reference>())
+        .def("batch_receive_policy", 
&ConsumerConfiguration::setBatchReceivePolicy)
         .def("consumer_name", &ConsumerConfiguration::setConsumerName)
         .def("unacked_messages_timeout_ms", 
&ConsumerConfiguration::getUnAckedMessagesTimeoutMs)
         .def("unacked_messages_timeout_ms", 
&ConsumerConfiguration::setUnAckedMessagesTimeoutMs)
diff --git a/src/consumer.cc b/src/consumer.cc
index 811ceb3..5298fae 100644
--- a/src/consumer.cc
+++ b/src/consumer.cc
@@ -42,6 +42,15 @@ Message Consumer_receive_timeout(Consumer& consumer, int 
timeoutMs) {
     return msg;
 }
 
+Messages Consumer_batch_receive(Consumer& consumer) {
+    Messages msgs;
+    Result res;
+    Py_BEGIN_ALLOW_THREADS res = consumer.batchReceive(msgs);
+    Py_END_ALLOW_THREADS
+        CHECK_RESULT(res);
+    return msgs;
+}
+
 void Consumer_acknowledge(Consumer& consumer, const Message& msg) { 
consumer.acknowledgeAsync(msg, nullptr); }
 
 void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& 
msgId) {
@@ -103,6 +112,7 @@ void export_consumer() {
         .def("unsubscribe", &Consumer_unsubscribe)
         .def("receive", &Consumer_receive)
         .def("receive", &Consumer_receive_timeout)
+        .def("batch_receive", &Consumer_batch_receive)
         .def("acknowledge", &Consumer_acknowledge)
         .def("acknowledge", &Consumer_acknowledge_message_id)
         .def("acknowledge_cumulative", &Consumer_acknowledge_cumulative)
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index d0f1ba0..30f451d 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -39,6 +39,7 @@ from pulsar import (
     AuthenticationToken,
     InitialPosition,
     CryptoKeyReader,
+    ConsumerBatchReceivePolicy,
 )
 from pulsar.schema import JsonSchema, Record, Integer
 
@@ -1064,6 +1065,27 @@ class PulsarTest(TestCase):
             consumer.receive(100)
         client.close()
 
+    def test_batch_receive(self):
+        client = Client(self.serviceUrl)
+        topic = "my-python-topic-batch-receive-" + str(time.time())
+        consumer = client.subscribe(topic, "my-sub", 
consumer_type=ConsumerType.Shared,
+                                    start_message_id_inclusive=True, 
batch_receive_policy=ConsumerBatchReceivePolicy(10, -1, -1))
+        producer = client.create_producer(topic)
+
+
+        for i in range(10):
+            if i > 0:
+                time.sleep(0.02)
+            producer.send(b"hello-%d" % i)
+
+        msgs = consumer.batch_receive()
+        i = 0
+        for msg in msgs:
+            self.assertEqual(msg.data(), b"hello-%d" % i)
+            i += 1
+
+        client.close()
+
     def test_message_id(self):
         s = MessageId.earliest.serialize()
         self.assertEqual(MessageId.deserialize(s), MessageId.earliest)

Reply via email to