This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 4c28ed5  feat: support producer access mode. (#138)
4c28ed5 is described below

commit 4c28ed52f41c5ec9943a3ddb36faedcdfcf3178e
Author: Baodi Shi <[email protected]>
AuthorDate: Wed Aug 9 11:05:54 2023 +0800

    feat: support producer access mode. (#138)
---
 pulsar/__init__.py   | 18 ++++++++++++++--
 src/config.cc        |  4 +++-
 src/enums.cc         |  6 ++++++
 tests/pulsar_test.py | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 83 insertions(+), 3 deletions(-)

diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index a1b18c1..4d941a5 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -48,7 +48,7 @@ from typing import List, Tuple, Optional
 import _pulsar
 
 from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, 
PartitionsRoutingMode, BatchingType, \
-    LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode  # noqa: 
F401
+    LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, 
ProducerAccessMode  # noqa: F401
 
 from pulsar.__about__ import __version__
 
@@ -523,7 +523,8 @@ class Client:
                         properties=None,
                         batching_type=BatchingType.Default,
                         encryption_key=None,
-                        crypto_key_reader=None
+                        crypto_key_reader=None,
+                        access_mode=ProducerAccessMode.Shared,
                         ):
         """
         Create a new producer on a given topic.
@@ -614,6 +615,17 @@ class Client:
         crypto_key_reader: CryptoKeyReader, optional
             Symmetric encryption class implementation, configuring public key 
encryption messages for the producer
             and private key decryption messages for the consumer
+        access_mode: ProducerAccessMode, optional
+            Set the type of access mode that the producer requires on the 
topic.
+
+            Supported modes:
+
+            * Shared: By default multiple producers can publish on a topic.
+            * Exclusive: Require exclusive access for producer.
+                         Fail immediately if there's already a producer 
connected.
+            * WaitForExclusive: Producer creation is pending until it can 
acquire exclusive access.
+            * ExclusiveWithFencing: Acquire exclusive access for the producer.
+                                    Any existing producer will be removed and 
invalidated immediately.
         """
         _check_type(str, topic, 'topic')
         _check_type_or_none(str, producer_name, 'producer_name')
@@ -634,6 +646,7 @@ class Client:
         _check_type_or_none(str, encryption_key, 'encryption_key')
         _check_type_or_none(CryptoKeyReader, crypto_key_reader, 
'crypto_key_reader')
         _check_type(bool, lazy_start_partitioned_producers, 
'lazy_start_partitioned_producers')
+        _check_type(ProducerAccessMode, access_mode, 'access_mode')
 
         conf = _pulsar.ProducerConfiguration()
         conf.send_timeout_millis(send_timeout_millis)
@@ -649,6 +662,7 @@ class Client:
         conf.batching_type(batching_type)
         conf.chunking_enabled(chunking_enabled)
         conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+        conf.access_mode(access_mode)
         if producer_name:
             conf.producer_name(producer_name)
         if initial_sequence_id:
diff --git a/src/config.cc b/src/config.cc
index c71d5b0..a440c53 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -223,7 +223,9 @@ void export_config(py::module_& m) {
         .def("batching_type", &ProducerConfiguration::setBatchingType, 
return_value_policy::reference)
         .def("batching_type", &ProducerConfiguration::getBatchingType)
         .def("encryption_key", &ProducerConfiguration::addEncryptionKey, 
return_value_policy::reference)
-        .def("crypto_key_reader", &ProducerConfiguration::setCryptoKeyReader, 
return_value_policy::reference);
+        .def("crypto_key_reader", &ProducerConfiguration::setCryptoKeyReader, 
return_value_policy::reference)
+        .def("access_mode", &ProducerConfiguration::setAccessMode, 
return_value_policy::reference)
+        .def("access_mode", &ProducerConfiguration::getAccessMode, 
return_value_policy::copy);
 
     class_<BatchReceivePolicy>(m, "BatchReceivePolicy")
         .def(init<int, int, long>())
diff --git a/src/enums.cc b/src/enums.cc
index 8dacc54..33affd0 100644
--- a/src/enums.cc
+++ b/src/enums.cc
@@ -124,6 +124,12 @@ void export_enums(py::module_& m) {
         .value("Default", ProducerConfiguration::DefaultBatching)
         .value("KeyBased", ProducerConfiguration::KeyBasedBatching);
 
+    enum_<ProducerConfiguration::ProducerAccessMode>(m, "ProducerAccessMode", 
"Producer Access Mode")
+        .value("Shared", ProducerConfiguration::ProducerAccessMode::Shared)
+        .value("Exclusive", 
ProducerConfiguration::ProducerAccessMode::Exclusive)
+        .value("WaitForExclusive", 
ProducerConfiguration::ProducerAccessMode::WaitForExclusive)
+        .value("ExclusiveWithFencing", 
ProducerConfiguration::ProducerAccessMode::ExclusiveWithFencing);
+
     enum_<Logger::Level>(m, "LoggerLevel")
         .value("Debug", Logger::LEVEL_DEBUG)
         .value("Info", Logger::LEVEL_INFO)
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 60801c9..b47c735 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -42,6 +42,7 @@ from pulsar import (
     InitialPosition,
     CryptoKeyReader,
     ConsumerBatchReceivePolicy,
+    ProducerAccessMode,
 )
 from pulsar.schema import JsonSchema, Record, Integer
 
@@ -166,6 +167,63 @@ class PulsarTest(TestCase):
         self.assertEqual(msg_id, msg.message_id())
         client.close()
 
+    def test_producer_access_mode_exclusive(self):
+        client = Client(self.serviceUrl)
+        topic_name = "test-access-mode-exclusive"
+        client.create_producer(topic_name, producer_name="p1", 
access_mode=ProducerAccessMode.Exclusive)
+        with self.assertRaises(pulsar.ProducerFenced):
+            client.create_producer(topic_name, producer_name="p2", 
access_mode=ProducerAccessMode.Exclusive)
+        client.close()
+
+    def test_producer_access_mode_wait_exclusive(self):
+        client = Client(self.serviceUrl)
+        topic_name = "test_producer_access_mode_wait_exclusive"
+        producer1 = client.create_producer(
+            topic=topic_name,
+            producer_name='p-1',
+            access_mode=ProducerAccessMode.Exclusive
+        )
+        assert producer1.producer_name() == 'p-1'
+
+        # when p1 close, p2 success created.
+        producer1.close()
+        producer2 = client.create_producer(
+            topic=topic_name,
+            producer_name='p-2',
+            access_mode=ProducerAccessMode.WaitForExclusive
+        )
+        assert producer2.producer_name() == 'p-2'
+
+        producer2.close()
+        client.close()
+
+    def test_producer_access_mode_exclusive_with_fencing(self):
+        client = Client(self.serviceUrl)
+        topic_name = 'test_producer_access_mode_exclusive_with_fencing'
+
+        producer1 = client.create_producer(
+            topic=topic_name,
+            producer_name='p-1',
+            access_mode=ProducerAccessMode.Exclusive
+        )
+        assert producer1.producer_name() == 'p-1'
+
+        producer2 = client.create_producer(
+            topic=topic_name,
+            producer_name='p-2',
+            access_mode=ProducerAccessMode.ExclusiveWithFencing
+        )
+        assert producer2.producer_name() == 'p-2'
+
+        # producer1 will be fenced.
+        with self.assertRaises((pulsar.ProducerFenced, pulsar.AlreadyClosed)):
+            producer1.send('test-msg'.encode('utf-8'))
+        # sleep 200ms to make sure producer1 is close done.
+        time.sleep(0.2)
+
+        producer2.close()
+        client.close()
+
     def test_producer_is_connected(self):
         client = Client(self.serviceUrl)
         topic = "test_producer_is_connected"

Reply via email to