This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new eb34eaf  [Enhancement/Feature Issue #47] Added support for 
KeySharedPolicy for the consumer when in KeyShared mode. (#109)
eb34eaf is described below

commit eb34eafa7fef18713937843c0793fc82a622cd30
Author: Tommy <ad...@hyperevo.com>
AuthorDate: Sun Jun 4 23:36:15 2023 -0700

    [Enhancement/Feature Issue #47] Added support for KeySharedPolicy for the 
consumer when in KeyShared mode. (#109)
    
    ### Motivation
    The pulsar python client lacks support for defining KeyShared behaviour 
like out of order message delivery and sticky-hash, auto-hash for consumers in 
KeyShared mode. This PR adds full support. The user can now provide a 
KeySharedPolicy when starting a consumer with client.subscribe() #47
    
    The ConsumerConfiguration::KeySharedPolicy and related setter/getter are 
now exposed to the Python client in this PR.
    
    ### Modifications
    
    - Added pybind11 enum for KeySharedMode in src/enums.cc.
    - Added pybind11 class for KeySharedPolicy in src/config.cc.
    - Modified pybind11 class for ConsumerConfiguration and added function to 
set KeySharedPolicy and function to read KeySharedPolicy.
    - Added KeySharedPolicy wrapper to pulsar/__init__.py. This wrapper handles 
KeySharedPolicy initialization and does some value validation.
    - Added the key_shared_policy parameter to client.subscribe(), some 
validation, and adding to the config in pulsar/__init__.py.
    - Added 4 new tests to test the new KeySharedPolicy functionality to 
tests/pulsar_test.py
---
 .gitignore           |   6 +++
 pulsar/__init__.py   |  80 ++++++++++++++++++++++++++++++-
 src/config.cc        |  13 +++++
 src/enums.cc         |   5 ++
 tests/pulsar_test.py | 131 +++++++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 233 insertions(+), 2 deletions(-)

diff --git a/.gitignore b/.gitignore
index 72f931b..b01979f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,3 +17,9 @@ wheelhouse
 vcpkg_installed/
 *.pyd
 *.lib
+
+
+lib_pulsar.so
+tests/test.log
+.tests-container-id.txt
+
diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index dbf3d82..5d7cedb 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -43,10 +43,12 @@ Read the instructions on `source code repository
 """
 
 import logging
+from typing import List, Tuple, Optional
+
 import _pulsar
 
 from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, 
PartitionsRoutingMode, BatchingType, \
-    LoggerLevel, BatchReceivePolicy  # noqa: F401
+    LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode  # noqa: 
F401
 
 from pulsar.__about__ import __version__
 
@@ -689,7 +691,8 @@ class Client:
                   max_pending_chunked_message=10,
                   auto_ack_oldest_chunked_message_on_queue_full=False,
                   start_message_id_inclusive=False,
-                  batch_receive_policy=None
+                  batch_receive_policy=None,
+                  key_shared_policy=None
                   ):
         """
         Subscribe to the given topic and subscription combination.
@@ -774,6 +777,8 @@ class Client:
           Set the consumer to include the given position of any reset 
operation like Consumer::seek.
         batch_receive_policy: class ConsumerBatchReceivePolicy
           Set the batch collection policy for batch receiving.
+        key_shared_policy: class ConsumerKeySharedPolicy
+            Set the key shared policy for use when the ConsumerType is 
KeyShared.
         """
         _check_type(str, subscription_name, 'subscription_name')
         _check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -794,6 +799,7 @@ class Client:
         _check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 
'auto_ack_oldest_chunked_message_on_queue_full')
         _check_type(bool, start_message_id_inclusive, 
'start_message_id_inclusive')
         _check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 
'batch_receive_policy')
+        _check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 
'key_shared_policy')
 
         conf = _pulsar.ConsumerConfiguration()
         conf.consumer_type(consumer_type)
@@ -826,6 +832,9 @@ class Client:
         if batch_receive_policy:
             conf.batch_receive_policy(batch_receive_policy.policy())
 
+        if key_shared_policy:
+            conf.key_shared_policy(key_shared_policy.policy())
+
         c = Consumer()
         if isinstance(topic, str):
             # Single topic
@@ -1448,6 +1457,73 @@ class ConsumerBatchReceivePolicy:
         """
         return self._policy
 
+class ConsumerKeySharedPolicy:
+    """
+    Consumer key shared policy is used to configure the consumer behaviour 
when the ConsumerType is KeyShared.
+    """
+    def __init__(
+            self,
+            key_shared_mode: KeySharedMode = KeySharedMode.AutoSplit,
+            allow_out_of_order_delivery: bool = False,
+            sticky_ranges: Optional[List[Tuple[int, int]]] = None,
+    ):
+        """
+        Wrapper KeySharedPolicy.
+
+        Parameters
+        ----------
+
+        key_shared_mode: KeySharedMode, optional
+            Set the key shared mode. eg: KeySharedMode.Sticky or 
KeysharedMode.AutoSplit
+
+        allow_out_of_order_delivery: bool, optional
+            Set whether to allow for out of order delivery
+            If it is enabled, it relaxes the ordering requirement and allows 
the broker to send out-of-order
+            messages in case of failures. This makes it faster for new 
consumers to join without being stalled by
+            an existing slow consumer.
+
+            If this is True, a single consumer still receives all keys, but 
they may come in different orders.
+
+        sticky_ranges: List[Tuple[int, int]], optional
+            Set the ranges used with sticky mode. The integers can be from 0 
to 2^16 (0 <= val < 65,536)
+        """
+        if key_shared_mode == KeySharedMode.Sticky and sticky_ranges is None:
+            raise ValueError("When using key_shared_mode = 
KeySharedMode.Sticky you must also provide sticky_ranges")
+
+        self._policy = KeySharedPolicy()
+        self._policy.set_key_shared_mode(key_shared_mode)
+        
self._policy.set_allow_out_of_order_delivery(allow_out_of_order_delivery)
+
+        if sticky_ranges is not None:
+            self._policy.set_sticky_ranges(sticky_ranges)
+
+    @property
+    def key_shared_mode(self) -> KeySharedMode:
+        """
+        Returns the key shared mode
+        """
+        return self._policy.get_key_shared_mode()
+
+    @property
+    def allow_out_of_order_delivery(self) -> bool:
+        """
+        Returns whether out of order delivery is enabled
+        """
+        return self._policy.is_allow_out_of_order_delivery()
+
+    @property
+    def sticky_ranges(self) -> List[Tuple[int, int]]:
+        """
+        Returns the actual sticky ranges
+        """
+        return self._policy.get_sticky_ranges()
+
+    def policy(self):
+        """
+        Returns the actual KeySharedPolicy.
+        """
+        return self._policy
+
 class Reader:
     """
     Pulsar topic reader.
diff --git a/src/config.cc b/src/config.cc
index 71795dd..7e2d38d 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -21,8 +21,10 @@
 #include <pulsar/ConsoleLoggerFactory.h>
 #include <pulsar/ConsumerConfiguration.h>
 #include <pulsar/ProducerConfiguration.h>
+#include <pulsar/KeySharedPolicy.h>
 #include <pybind11/functional.h>
 #include <pybind11/pybind11.h>
+#include <pybind11/stl.h>
 #include <memory>
 
 namespace py = pybind11;
@@ -121,6 +123,15 @@ static ClientConfiguration& 
ClientConfiguration_setFileLogger(ClientConfiguratio
 void export_config(py::module_& m) {
     using namespace py;
 
+    class_<KeySharedPolicy, std::shared_ptr<KeySharedPolicy>>(m, 
"KeySharedPolicy")
+        .def(init<>())
+        .def("set_key_shared_mode", &KeySharedPolicy::setKeySharedMode, 
return_value_policy::reference)
+        .def("get_key_shared_mode", &KeySharedPolicy::getKeySharedMode)
+        .def("set_allow_out_of_order_delivery", 
&KeySharedPolicy::setAllowOutOfOrderDelivery, return_value_policy::reference)
+        .def("is_allow_out_of_order_delivery", 
&KeySharedPolicy::isAllowOutOfOrderDelivery)
+        .def("set_sticky_ranges", static_cast<KeySharedPolicy& 
(KeySharedPolicy::*)(const StickyRanges&)>(&KeySharedPolicy::setStickyRanges), 
return_value_policy::reference)
+        .def("get_sticky_ranges", &KeySharedPolicy::getStickyRanges);
+
     class_<CryptoKeyReader, std::shared_ptr<CryptoKeyReader>>(m, 
"AbstractCryptoKeyReader")
         .def("getPublicKey", &CryptoKeyReader::getPublicKey)
         .def("getPrivateKey", &CryptoKeyReader::getPrivateKey);
@@ -222,6 +233,8 @@ void export_config(py::module_& m) {
 
     class_<ConsumerConfiguration, std::shared_ptr<ConsumerConfiguration>>(m, 
"ConsumerConfiguration")
         .def(init<>())
+        .def("key_shared_policy", &ConsumerConfiguration::getKeySharedPolicy)
+        .def("key_shared_policy", &ConsumerConfiguration::setKeySharedPolicy, 
return_value_policy::reference)
         .def("consumer_type", &ConsumerConfiguration::getConsumerType)
         .def("consumer_type", &ConsumerConfiguration::setConsumerType, 
return_value_policy::reference)
         .def("schema", &ConsumerConfiguration::getSchema, 
return_value_policy::copy)
diff --git a/src/enums.cc b/src/enums.cc
index f61011f..8dacc54 100644
--- a/src/enums.cc
+++ b/src/enums.cc
@@ -20,6 +20,7 @@
 #include <pulsar/CompressionType.h>
 #include <pulsar/ConsumerConfiguration.h>
 #include <pulsar/ProducerConfiguration.h>
+#include <pulsar/KeySharedPolicy.h>
 #include <pybind11/pybind11.h>
 
 using namespace pulsar;
@@ -28,6 +29,10 @@ namespace py = pybind11;
 void export_enums(py::module_& m) {
     using namespace py;
 
+    enum_<KeySharedMode>(m, "KeySharedMode")
+        .value("AutoSplit", AUTO_SPLIT)
+        .value("Sticky", STICKY);
+
     enum_<ProducerConfiguration::PartitionsRoutingMode>(m, 
"PartitionsRoutingMode")
         .value("UseSinglePartition", ProducerConfiguration::UseSinglePartition)
         .value("RoundRobinDistribution", 
ProducerConfiguration::RoundRobinDistribution)
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index eeb2a6a..3ec89a7 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -32,6 +32,8 @@ from pulsar import (
     MessageId,
     CompressionType,
     ConsumerType,
+    KeySharedMode,
+    ConsumerKeySharedPolicy,
     PartitionsRoutingMode,
     AuthenticationBasic,
     AuthenticationTLS,
@@ -1437,6 +1439,134 @@ class PulsarTest(TestCase):
         producer.flush()
         client.close()
 
+    def test_keyshare_policy(self):
+        with self.assertRaises(ValueError):
+            # Raise error because sticky ranges are not provided.
+            pulsar.ConsumerKeySharedPolicy(
+                key_shared_mode=pulsar.KeySharedMode.Sticky,
+                allow_out_of_order_delivery=False,
+            )
+
+        expected_key_shared_mode = pulsar.KeySharedMode.Sticky
+        expected_allow_out_of_order_delivery = True
+        expected_sticky_ranges = [(0, 100), (101,200)]
+        consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
+            key_shared_mode=expected_key_shared_mode,
+            allow_out_of_order_delivery=expected_allow_out_of_order_delivery,
+            sticky_ranges=expected_sticky_ranges
+        )
+
+        self.assertEqual(consumer_key_shared_policy.key_shared_mode, 
expected_key_shared_mode)
+        
self.assertEqual(consumer_key_shared_policy.allow_out_of_order_delivery, 
expected_allow_out_of_order_delivery)
+        self.assertEqual(consumer_key_shared_policy.sticky_ranges, 
expected_sticky_ranges)
+
+    def test_keyshared_invalid_sticky_ranges(self):
+        client = Client(self.serviceUrl)
+        topic = "my-python-topic-keyshare-invalid-" + str(time.time())
+        with self.assertRaises(ValueError):
+            consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
+                key_shared_mode=pulsar.KeySharedMode.Sticky,
+                allow_out_of_order_delivery=False,
+                sticky_ranges=[(0,65536)]
+            )
+            client.subscribe(topic, "my-sub", 
consumer_type=ConsumerType.KeyShared,
+                             start_message_id_inclusive=True,
+                             key_shared_policy=consumer_key_shared_policy)
+
+        with self.assertRaises(ValueError):
+            consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
+                key_shared_mode=pulsar.KeySharedMode.Sticky,
+                allow_out_of_order_delivery=False,
+                sticky_ranges=[(0, 100), (50, 150)]
+            )
+            client.subscribe(topic, "my-sub", 
consumer_type=ConsumerType.KeyShared,
+                             start_message_id_inclusive=True,
+                             key_shared_policy=consumer_key_shared_policy)
+
+    def test_keyshared_autosplit(self):
+        client = Client(self.serviceUrl)
+        topic = "my-python-topic-keyshare-autosplit-" + str(time.time())
+        consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
+            key_shared_mode=pulsar.KeySharedMode.AutoSplit,
+            allow_out_of_order_delivery=True,
+        )
+        consumer = client.subscribe(topic, "my-sub", 
consumer_type=ConsumerType.KeyShared, consumer_name = 'con-1',
+                                    start_message_id_inclusive=True, 
key_shared_policy=consumer_key_shared_policy)
+        consumer2 = client.subscribe(topic, "my-sub", 
consumer_type=ConsumerType.KeyShared, consumer_name = 'con-2',
+                                    start_message_id_inclusive=True, 
key_shared_policy=consumer_key_shared_policy)
+        producer = client.create_producer(topic)
+
+        for i in range(10):
+            if i > 0:
+                time.sleep(0.02)
+            producer.send(b"hello-%d" % i)
+
+        msgs = []
+        while True:
+            try:
+                msg = consumer.receive(100)
+            except pulsar.Timeout:
+                break
+            msgs.append(msg)
+            consumer.acknowledge(msg)
+
+        while True:
+            try:
+                msg = consumer2.receive(100)
+            except pulsar.Timeout:
+                break
+            msgs.append(msg)
+            consumer2.acknowledge(msg)
+
+        self.assertEqual(len(msgs), 10)
+        client.close()
+
+    def test_sticky_autosplit(self):
+        client = Client(self.serviceUrl)
+        topic = "my-python-topic-keyshare-sticky-" + str(time.time())
+        consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
+            key_shared_mode=pulsar.KeySharedMode.Sticky,
+            allow_out_of_order_delivery=True,
+            sticky_ranges=[(0,30000)],
+        )
+
+        consumer = client.subscribe(topic, "my-sub", 
consumer_type=ConsumerType.KeyShared, consumer_name='con-1',
+                                    start_message_id_inclusive=True, 
key_shared_policy=consumer_key_shared_policy)
+
+        consumer2_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
+            key_shared_mode=pulsar.KeySharedMode.Sticky,
+            allow_out_of_order_delivery=True,
+            sticky_ranges=[(30001, 65535)],
+        )
+        consumer2 = client.subscribe(topic, "my-sub", 
consumer_type=ConsumerType.KeyShared, consumer_name='con-2',
+                                     start_message_id_inclusive=True, 
key_shared_policy=consumer2_key_shared_policy)
+        producer = client.create_producer(topic)
+
+        for i in range(10):
+            if i > 0:
+                time.sleep(0.02)
+            producer.send(b"hello-%d" % i)
+
+        msgs = []
+        while True:
+            try:
+                msg = consumer.receive(100)
+            except pulsar.Timeout:
+                break
+            msgs.append(msg)
+            consumer.acknowledge(msg)
+
+        while True:
+            try:
+                msg = consumer2.receive(100)
+            except pulsar.Timeout:
+                break
+            msgs.append(msg)
+            consumer2.acknowledge(msg)
+
+        self.assertEqual(len(msgs), 10)
+        client.close()
+
     def test_acknowledge_failed(self):
         client = Client(self.serviceUrl)
         topic = 'test_acknowledge_failed'
@@ -1461,5 +1591,6 @@ class PulsarTest(TestCase):
         client.close()
 
 
+
 if __name__ == "__main__":
     main()

Reply via email to