This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new 4c28ed5 feat: support producer access mode. (#138)
4c28ed5 is described below
commit 4c28ed52f41c5ec9943a3ddb36faedcdfcf3178e
Author: Baodi Shi <[email protected]>
AuthorDate: Wed Aug 9 11:05:54 2023 +0800
feat: support producer access mode. (#138)
---
pulsar/__init__.py | 18 ++++++++++++++--
src/config.cc | 4 +++-
src/enums.cc | 6 ++++++
tests/pulsar_test.py | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++
4 files changed, 83 insertions(+), 3 deletions(-)
diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index a1b18c1..4d941a5 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -48,7 +48,7 @@ from typing import List, Tuple, Optional
import _pulsar
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition,
PartitionsRoutingMode, BatchingType, \
- LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode # noqa:
F401
+ LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode,
ProducerAccessMode # noqa: F401
from pulsar.__about__ import __version__
@@ -523,7 +523,8 @@ class Client:
properties=None,
batching_type=BatchingType.Default,
encryption_key=None,
- crypto_key_reader=None
+ crypto_key_reader=None,
+ access_mode=ProducerAccessMode.Shared,
):
"""
Create a new producer on a given topic.
@@ -614,6 +615,17 @@ class Client:
crypto_key_reader: CryptoKeyReader, optional
Symmetric encryption class implementation, configuring public key
encryption messages for the producer
and private key decryption messages for the consumer
+ access_mode: ProducerAccessMode, optional
+ Set the type of access mode that the producer requires on the
topic.
+
+ Supported modes:
+
+ * Shared: By default multiple producers can publish on a topic.
+ * Exclusive: Require exclusive access for producer.
+ Fail immediately if there's already a producer
connected.
+ * WaitForExclusive: Producer creation is pending until it can
acquire exclusive access.
+ * ExclusiveWithFencing: Acquire exclusive access for the producer.
+ Any existing producer will be removed and
invalidated immediately.
"""
_check_type(str, topic, 'topic')
_check_type_or_none(str, producer_name, 'producer_name')
@@ -634,6 +646,7 @@ class Client:
_check_type_or_none(str, encryption_key, 'encryption_key')
_check_type_or_none(CryptoKeyReader, crypto_key_reader,
'crypto_key_reader')
_check_type(bool, lazy_start_partitioned_producers,
'lazy_start_partitioned_producers')
+ _check_type(ProducerAccessMode, access_mode, 'access_mode')
conf = _pulsar.ProducerConfiguration()
conf.send_timeout_millis(send_timeout_millis)
@@ -649,6 +662,7 @@ class Client:
conf.batching_type(batching_type)
conf.chunking_enabled(chunking_enabled)
conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+ conf.access_mode(access_mode)
if producer_name:
conf.producer_name(producer_name)
if initial_sequence_id:
diff --git a/src/config.cc b/src/config.cc
index c71d5b0..a440c53 100644
--- a/src/config.cc
+++ b/src/config.cc
@@ -223,7 +223,9 @@ void export_config(py::module_& m) {
.def("batching_type", &ProducerConfiguration::setBatchingType,
return_value_policy::reference)
.def("batching_type", &ProducerConfiguration::getBatchingType)
.def("encryption_key", &ProducerConfiguration::addEncryptionKey,
return_value_policy::reference)
- .def("crypto_key_reader", &ProducerConfiguration::setCryptoKeyReader,
return_value_policy::reference);
+ .def("crypto_key_reader", &ProducerConfiguration::setCryptoKeyReader,
return_value_policy::reference)
+ .def("access_mode", &ProducerConfiguration::setAccessMode,
return_value_policy::reference)
+ .def("access_mode", &ProducerConfiguration::getAccessMode,
return_value_policy::copy);
class_<BatchReceivePolicy>(m, "BatchReceivePolicy")
.def(init<int, int, long>())
diff --git a/src/enums.cc b/src/enums.cc
index 8dacc54..33affd0 100644
--- a/src/enums.cc
+++ b/src/enums.cc
@@ -124,6 +124,12 @@ void export_enums(py::module_& m) {
.value("Default", ProducerConfiguration::DefaultBatching)
.value("KeyBased", ProducerConfiguration::KeyBasedBatching);
+ enum_<ProducerConfiguration::ProducerAccessMode>(m, "ProducerAccessMode",
"Producer Access Mode")
+ .value("Shared", ProducerConfiguration::ProducerAccessMode::Shared)
+ .value("Exclusive",
ProducerConfiguration::ProducerAccessMode::Exclusive)
+ .value("WaitForExclusive",
ProducerConfiguration::ProducerAccessMode::WaitForExclusive)
+ .value("ExclusiveWithFencing",
ProducerConfiguration::ProducerAccessMode::ExclusiveWithFencing);
+
enum_<Logger::Level>(m, "LoggerLevel")
.value("Debug", Logger::LEVEL_DEBUG)
.value("Info", Logger::LEVEL_INFO)
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 60801c9..b47c735 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -42,6 +42,7 @@ from pulsar import (
InitialPosition,
CryptoKeyReader,
ConsumerBatchReceivePolicy,
+ ProducerAccessMode,
)
from pulsar.schema import JsonSchema, Record, Integer
@@ -166,6 +167,63 @@ class PulsarTest(TestCase):
self.assertEqual(msg_id, msg.message_id())
client.close()
+ def test_producer_access_mode_exclusive(self):
+ client = Client(self.serviceUrl)
+ topic_name = "test-access-mode-exclusive"
+ client.create_producer(topic_name, producer_name="p1",
access_mode=ProducerAccessMode.Exclusive)
+ with self.assertRaises(pulsar.ProducerFenced):
+ client.create_producer(topic_name, producer_name="p2",
access_mode=ProducerAccessMode.Exclusive)
+ client.close()
+
+ def test_producer_access_mode_wait_exclusive(self):
+ client = Client(self.serviceUrl)
+ topic_name = "test_producer_access_mode_wait_exclusive"
+ producer1 = client.create_producer(
+ topic=topic_name,
+ producer_name='p-1',
+ access_mode=ProducerAccessMode.Exclusive
+ )
+ assert producer1.producer_name() == 'p-1'
+
+ # when p1 close, p2 success created.
+ producer1.close()
+ producer2 = client.create_producer(
+ topic=topic_name,
+ producer_name='p-2',
+ access_mode=ProducerAccessMode.WaitForExclusive
+ )
+ assert producer2.producer_name() == 'p-2'
+
+ producer2.close()
+ client.close()
+
+ def test_producer_access_mode_exclusive_with_fencing(self):
+ client = Client(self.serviceUrl)
+ topic_name = 'test_producer_access_mode_exclusive_with_fencing'
+
+ producer1 = client.create_producer(
+ topic=topic_name,
+ producer_name='p-1',
+ access_mode=ProducerAccessMode.Exclusive
+ )
+ assert producer1.producer_name() == 'p-1'
+
+ producer2 = client.create_producer(
+ topic=topic_name,
+ producer_name='p-2',
+ access_mode=ProducerAccessMode.ExclusiveWithFencing
+ )
+ assert producer2.producer_name() == 'p-2'
+
+ # producer1 will be fenced.
+ with self.assertRaises((pulsar.ProducerFenced, pulsar.AlreadyClosed)):
+ producer1.send('test-msg'.encode('utf-8'))
+ # sleep 200ms to make sure producer1 is close done.
+ time.sleep(0.2)
+
+ producer2.close()
+ client.close()
+
def test_producer_is_connected(self):
client = Client(self.serviceUrl)
topic = "test_producer_is_connected"