Copilot commented on code in PR #277:
URL:
https://github.com/apache/pulsar-client-python/pull/277#discussion_r2643054382
##########
pulsar/asyncio.py:
##########
@@ -145,12 +401,239 @@ async def create_producer(self, topic: str) -> Producer:
------
PulsarException
"""
+ if schema is None:
+ schema = pulsar.schema.BytesSchema()
+
future = asyncio.get_running_loop().create_future()
conf = _pulsar.ProducerConfiguration()
- # TODO: add more configs
- self._client.create_producer_async(topic, conf,
functools.partial(_set_future, future))
+ conf.send_timeout_millis(send_timeout_millis)
+ conf.compression_type(compression_type)
+ conf.max_pending_messages(max_pending_messages)
+
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+ conf.block_if_queue_full(block_if_queue_full)
+ conf.batching_enabled(batching_enabled)
+ conf.batching_max_messages(batching_max_messages)
+
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+ conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+ conf.partitions_routing_mode(message_routing_mode)
+ conf.batching_type(batching_type)
+ conf.chunking_enabled(chunking_enabled)
+ conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+ conf.access_mode(access_mode)
+ if message_router is not None:
+ def underlying_router(msg, num_partitions):
+ return int(message_router(pulsar.Message._wrap(msg),
+ num_partitions))
+ conf.message_router(underlying_router)
+
+ if producer_name:
+ conf.producer_name(producer_name)
+ if initial_sequence_id is not None:
+ conf.initial_sequence_id(initial_sequence_id)
+ if properties:
+ for k, v in properties.items():
+ conf.property(k, v)
+
+ conf.schema(schema.schema_info())
+ if encryption_key:
+ conf.encryption_key(encryption_key)
+ if crypto_key_reader:
+ conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+ if batching_enabled and chunking_enabled:
+ raise ValueError(
+ "Batching and chunking of messages can't be enabled together."
+ )
+
+ self._client.create_producer_async(
+ topic, conf, functools.partial(_set_future, future)
+ )
return Producer(await future)
+ # pylint: disable=too-many-arguments,too-many-locals,too-many-branches
+ async def subscribe(self, topic: Union[str, List[str]],
+ subscription_name: str,
+ consumer_type: pulsar.ConsumerType =
+ pulsar.ConsumerType.Exclusive,
+ schema: pulsar.schema.Schema = None,
+ message_listener = None,
Review Comment:
The message_listener parameter has no type hint, unlike other parameters in
this function signature. For consistency and better IDE support, it should have
a type hint (likely Optional[Callable[[Consumer, pulsar.Message], None]]).
##########
pulsar/asyncio.py:
##########
@@ -145,12 +401,239 @@ async def create_producer(self, topic: str) -> Producer:
------
PulsarException
"""
+ if schema is None:
+ schema = pulsar.schema.BytesSchema()
+
future = asyncio.get_running_loop().create_future()
conf = _pulsar.ProducerConfiguration()
- # TODO: add more configs
- self._client.create_producer_async(topic, conf,
functools.partial(_set_future, future))
+ conf.send_timeout_millis(send_timeout_millis)
+ conf.compression_type(compression_type)
+ conf.max_pending_messages(max_pending_messages)
+
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+ conf.block_if_queue_full(block_if_queue_full)
+ conf.batching_enabled(batching_enabled)
+ conf.batching_max_messages(batching_max_messages)
+
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+ conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+ conf.partitions_routing_mode(message_routing_mode)
+ conf.batching_type(batching_type)
+ conf.chunking_enabled(chunking_enabled)
+ conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+ conf.access_mode(access_mode)
+ if message_router is not None:
+ def underlying_router(msg, num_partitions):
+ return int(message_router(pulsar.Message._wrap(msg),
+ num_partitions))
+ conf.message_router(underlying_router)
+
+ if producer_name:
+ conf.producer_name(producer_name)
+ if initial_sequence_id is not None:
+ conf.initial_sequence_id(initial_sequence_id)
+ if properties:
+ for k, v in properties.items():
+ conf.property(k, v)
+
+ conf.schema(schema.schema_info())
+ if encryption_key:
+ conf.encryption_key(encryption_key)
+ if crypto_key_reader:
+ conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+ if batching_enabled and chunking_enabled:
+ raise ValueError(
+ "Batching and chunking of messages can't be enabled together."
+ )
+
+ self._client.create_producer_async(
+ topic, conf, functools.partial(_set_future, future)
+ )
return Producer(await future)
+ # pylint: disable=too-many-arguments,too-many-locals,too-many-branches
+ async def subscribe(self, topic: Union[str, List[str]],
+ subscription_name: str,
+ consumer_type: pulsar.ConsumerType =
+ pulsar.ConsumerType.Exclusive,
+ schema: pulsar.schema.Schema = None,
+ message_listener = None,
+ receiver_queue_size: int = 1000,
+ max_total_receiver_queue_size_across_partitions: int =
+ 50000,
+ consumer_name: str = None,
+ unacked_messages_timeout_ms: int = None,
+ broker_consumer_stats_cache_time_ms: int = 30000,
+ negative_ack_redelivery_delay_ms: int = 60000,
+ is_read_compacted: bool = False,
+ properties: dict = None,
+ pattern_auto_discovery_period: int = 60, # pylint:
disable=unused-argument
Review Comment:
The pattern_auto_discovery_period parameter is accepted but never actually
configured on the ConsumerConfiguration object. While
conf.pattern_auto_discovery_period() is available in the C++ bindings, it's not
called anywhere in this method. This parameter should either be applied to the
configuration or removed from the function signature. Note: This issue also
exists in the sync version (pulsar/__init__.py), but should still be addressed
here for completeness.
```suggestion
```
##########
pulsar/asyncio.py:
##########
@@ -127,14 +306,91 @@ def __init__(self, service_url, **kwargs) -> None:
"""
self._client: _pulsar.Client = pulsar.Client(service_url,
**kwargs)._client
- async def create_producer(self, topic: str) -> Producer:
+ # pylint: disable=too-many-arguments,too-many-locals
+ async def create_producer(self, topic: str,
+ producer_name: str = None,
+ schema: pulsar.schema.Schema = None,
+ initial_sequence_id: int = None,
+ send_timeout_millis: int = 30000,
+ compression_type: CompressionType =
CompressionType.NONE,
+ max_pending_messages: int = 1000,
+ max_pending_messages_across_partitions: int =
50000,
+ block_if_queue_full: bool = False,
+ batching_enabled: bool = True,
+ batching_max_messages: int = 1000,
+ batching_max_allowed_size_in_bytes: int =
128*1024,
+ batching_max_publish_delay_ms: int = 10,
+ chunking_enabled: bool = False,
+ message_routing_mode: PartitionsRoutingMode =
+ PartitionsRoutingMode.RoundRobinDistribution,
+ lazy_start_partitioned_producers: bool = False,
+ properties: dict = None,
+ batching_type: BatchingType =
BatchingType.Default,
+ encryption_key: str = None,
+ crypto_key_reader: pulsar.CryptoKeyReader = None,
+ access_mode: ProducerAccessMode =
ProducerAccessMode.Shared,
+ message_router: Callable[[pulsar.Message, int],
int] = None,
+ ) -> Producer:
"""
Create a new producer on a given topic
Parameters
----------
topic: str
The topic name
+ producer_name: str, optional
+ Specify a name for the producer. If not assigned, the system will
+ generate a globally unique name which can be accessed with
+ `Producer.producer_name()`. When specifying a name, it is app to
+ the user to ensure that, for a given topic, the producer name is
+ unique across all Pulsar's clusters.
+ schema: pulsar.schema.Schema, optional
+ Define the schema of the data that will be published by this
producer.
+ initial_sequence_id: int, optional
+ Set the baseline for the sequence ids for messages published by
+ the producer.
+ send_timeout_millis: int, default=30000
+ If a message is not acknowledged by the server before the
+ send_timeout expires, an error will be reported.
+ compression_type: CompressionType, default=CompressionType.NONE
+ Set the compression type for the producer.
+ max_pending_messages: int, default=1000
+ Set the max size of the queue holding the messages pending to
+ receive an acknowledgment from the broker.
+ max_pending_messages_across_partitions: int, default=50000
+ Set the max size of the queue holding the messages pending to
+ receive an acknowledgment across partitions.
+ block_if_queue_full: bool, default=False
+ Set whether send operations should block when the outgoing
+ message queue is full.
+ batching_enabled: bool, default=False
+ Enable automatic message batching.
Review Comment:
The documentation for batching_enabled incorrectly states "default=False"
when the actual default value is True (as shown in the parameter definition on
line 319). This is inconsistent with the sync version in pulsar/__init__.py
which has batching_enabled=False as the default and is also documented as
False. The PR description mentions that batching is enabled by default for
async producer to correct the previous incorrect default configuration, but
this creates an inconsistency between sync and async APIs that should be
documented or reconsidered.
```suggestion
batching_enabled: bool, default=True
Enable automatic message batching. Note that, unlike the
synchronous
producer API in ``pulsar.__init__``, batching is enabled by
default
for the asyncio producer.
```
##########
tests/asyncio_test.py:
##########
@@ -58,29 +63,181 @@ async def test_batch_send(self):
self.assertEqual(msg_ids[i].entry_id(), entry_id)
self.assertEqual(msg_ids[i].batch_index(), i)
+ consumer = await self._client.subscribe(topic, 'sub',
+
initial_position=pulsar.InitialPosition.Earliest)
+ for i in range(5):
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), f'msg-{i}'.encode())
+ await consumer.close()
+
+ # create a different subscription to verify initial position is latest
by default
+ consumer = await self._client.subscribe(topic, 'sub2')
+ await producer.send(b'final-message')
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'final-message')
+
async def test_create_producer_failure(self):
try:
- await
self._client.create_producer('tenant/ns/awaitio-test-send-failure')
+ await
self._client.create_producer('tenant/ns/asyncio-test-send-failure')
self.fail()
except PulsarException as e:
self.assertEqual(e.error(), pulsar.Result.Timeout)
async def test_send_failure(self):
- producer = await
self._client.create_producer('awaitio-test-send-failure')
+ producer = await
self._client.create_producer('asyncio-test-send-failure')
try:
await producer.send(('x' * 1024 * 1024 * 10).encode())
self.fail()
except PulsarException as e:
self.assertEqual(e.error(), pulsar.Result.MessageTooBig)
async def test_close_producer(self):
- producer = await
self._client.create_producer('awaitio-test-close-producer')
+ producer = await
self._client.create_producer('asyncio-test-close-producer')
await producer.close()
try:
await producer.close()
self.fail()
except PulsarException as e:
self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
+ async def _prepare_messages(self, producer: Producer) ->
List[pulsar.MessageId]:
+ msg_ids = []
+ for i in range(5):
+ msg_id = await producer.send(f'msg-{i}'.encode())
+ msg_ids.append(msg_id)
+ return msg_ids
+
+ async def test_consumer_cumulative_acknowledge(self):
+ topic = f'asyncio-test-consumer-cumulative-ack-{time.time()}'
+ sub = 'sub'
+ consumer = await self._client.subscribe(topic, sub)
+ producer = await self._client.create_producer(topic)
+ await self._prepare_messages(producer)
+ last_msg = None
+ for _ in range(5):
+ last_msg = await consumer.receive()
+ await consumer.acknowledge_cumulative(last_msg)
+ await consumer.close()
+
+ consumer = await self._client.subscribe(topic, sub)
+ await producer.send(b'final-message')
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'final-message')
+
+ async def test_consumer_individual_acknowledge(self):
+ topic = f'asyncio-test-consumer-individual-ack-{time.time()}'
+ sub = 'sub'
+ consumer = await self._client.subscribe(topic, sub,
+
consumer_type=pulsar.ConsumerType.Shared)
+ producer = await self._client.create_producer(topic)
+ await self._prepare_messages(producer)
+ msgs = []
+ for _ in range(5):
+ msg = await consumer.receive()
+ msgs.append(msg)
+
+ await consumer.acknowledge(msgs[0])
+ await consumer.acknowledge(msgs[2])
+ await consumer.acknowledge(msgs[4])
+ await consumer.close()
+
+ consumer = await self._client.subscribe(topic, sub,
+
consumer_type=pulsar.ConsumerType.Shared)
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'msg-1')
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'msg-3')
+
+ async def test_multi_topic_consumer(self):
+ topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2']
+ producers = []
+
+ for topic in topics:
+ producer = await self._client.create_producer(topic)
+ producers.append(producer)
+
+ consumer = await self._client.subscribe(topics,
'test-multi-subscription')
+
+ await producers[0].send(b'message-from-topic-1')
+ await producers[1].send(b'message-from-topic-2')
+
+ async def verify_receive(consumer: Consumer):
+ received_messages = {}
+ for _ in range(2):
+ msg = await consumer.receive()
+ received_messages[msg.data()] = None
+ await consumer.acknowledge(msg.message_id())
+ self.assertEqual(received_messages, {
+ b'message-from-topic-1': None,
+ b'message-from-topic-2': None
+ })
+
+ await verify_receive(consumer)
+ await consumer.close()
+
+ consumer = await
self._client.subscribe('public/default/asyncio-test-multi-topic-.*',
+ 'test-multi-subscription-2',
+ is_pattern_topic=True,
+
initial_position=pulsar.InitialPosition.Earliest)
+ await verify_receive(consumer)
+ await consumer.close()
+
+ async def test_unsubscribe(self):
+ topic = f'asyncio-test-unsubscribe-{time.time()}'
+ sub = 'sub'
+ consumer = await self._client.subscribe(topic, sub)
+ await consumer.unsubscribe()
+ consumer = await self._client.subscribe(topic, sub)
+
+ async def test_seek_message_id(self):
+ topic = f'asyncio-test-seek-message-id-{time.time()}'
+ sub = 'sub'
+ consumer = await self._client.subscribe(
+ topic, sub, initial_position=pulsar.InitialPosition.Earliest
+ )
+
+ producer = await self._client.create_producer(topic)
+ msg_ids = await self._prepare_messages(producer)
+
+ for i in range(5):
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), f'msg-{i}'.encode())
+
+ await consumer.seek(msg_ids[2])
+
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'msg-3')
Review Comment:
The seek operation seeks to msg_ids[2], but the test expects to receive
'msg-3' afterward. According to Pulsar's seek semantics, seeking to a message
ID typically means the next receive will start from the message after that ID.
However, this expectation depends on whether the subscription was configured
with start_message_id_inclusive. The test should either verify this is the
intended behavior or add a comment explaining the seek semantics being tested.
##########
tests/asyncio_test.py:
##########
@@ -58,29 +63,181 @@ async def test_batch_send(self):
self.assertEqual(msg_ids[i].entry_id(), entry_id)
self.assertEqual(msg_ids[i].batch_index(), i)
+ consumer = await self._client.subscribe(topic, 'sub',
+
initial_position=pulsar.InitialPosition.Earliest)
+ for i in range(5):
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), f'msg-{i}'.encode())
+ await consumer.close()
+
+ # create a different subscription to verify initial position is latest
by default
+ consumer = await self._client.subscribe(topic, 'sub2')
+ await producer.send(b'final-message')
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'final-message')
+
async def test_create_producer_failure(self):
try:
- await
self._client.create_producer('tenant/ns/awaitio-test-send-failure')
+ await
self._client.create_producer('tenant/ns/asyncio-test-send-failure')
self.fail()
except PulsarException as e:
self.assertEqual(e.error(), pulsar.Result.Timeout)
async def test_send_failure(self):
- producer = await
self._client.create_producer('awaitio-test-send-failure')
+ producer = await
self._client.create_producer('asyncio-test-send-failure')
try:
await producer.send(('x' * 1024 * 1024 * 10).encode())
self.fail()
except PulsarException as e:
self.assertEqual(e.error(), pulsar.Result.MessageTooBig)
async def test_close_producer(self):
- producer = await
self._client.create_producer('awaitio-test-close-producer')
+ producer = await
self._client.create_producer('asyncio-test-close-producer')
await producer.close()
try:
await producer.close()
self.fail()
except PulsarException as e:
self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
+ async def _prepare_messages(self, producer: Producer) ->
List[pulsar.MessageId]:
+ msg_ids = []
+ for i in range(5):
+ msg_id = await producer.send(f'msg-{i}'.encode())
+ msg_ids.append(msg_id)
+ return msg_ids
+
+ async def test_consumer_cumulative_acknowledge(self):
+ topic = f'asyncio-test-consumer-cumulative-ack-{time.time()}'
+ sub = 'sub'
+ consumer = await self._client.subscribe(topic, sub)
+ producer = await self._client.create_producer(topic)
+ await self._prepare_messages(producer)
+ last_msg = None
+ for _ in range(5):
+ last_msg = await consumer.receive()
+ await consumer.acknowledge_cumulative(last_msg)
+ await consumer.close()
+
+ consumer = await self._client.subscribe(topic, sub)
+ await producer.send(b'final-message')
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'final-message')
+
+ async def test_consumer_individual_acknowledge(self):
+ topic = f'asyncio-test-consumer-individual-ack-{time.time()}'
+ sub = 'sub'
+ consumer = await self._client.subscribe(topic, sub,
+
consumer_type=pulsar.ConsumerType.Shared)
+ producer = await self._client.create_producer(topic)
+ await self._prepare_messages(producer)
+ msgs = []
+ for _ in range(5):
+ msg = await consumer.receive()
+ msgs.append(msg)
+
+ await consumer.acknowledge(msgs[0])
+ await consumer.acknowledge(msgs[2])
+ await consumer.acknowledge(msgs[4])
+ await consumer.close()
+
+ consumer = await self._client.subscribe(topic, sub,
+
consumer_type=pulsar.ConsumerType.Shared)
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'msg-1')
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'msg-3')
+
+ async def test_multi_topic_consumer(self):
+ topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2']
+ producers = []
+
+ for topic in topics:
+ producer = await self._client.create_producer(topic)
+ producers.append(producer)
+
+ consumer = await self._client.subscribe(topics,
'test-multi-subscription')
+
+ await producers[0].send(b'message-from-topic-1')
+ await producers[1].send(b'message-from-topic-2')
+
+ async def verify_receive(consumer: Consumer):
+ received_messages = {}
+ for _ in range(2):
+ msg = await consumer.receive()
+ received_messages[msg.data()] = None
+ await consumer.acknowledge(msg.message_id())
+ self.assertEqual(received_messages, {
+ b'message-from-topic-1': None,
+ b'message-from-topic-2': None
+ })
+
+ await verify_receive(consumer)
+ await consumer.close()
+
+ consumer = await
self._client.subscribe('public/default/asyncio-test-multi-topic-.*',
+ 'test-multi-subscription-2',
+ is_pattern_topic=True,
+
initial_position=pulsar.InitialPosition.Earliest)
+ await verify_receive(consumer)
+ await consumer.close()
+
+ async def test_unsubscribe(self):
+ topic = f'asyncio-test-unsubscribe-{time.time()}'
+ sub = 'sub'
+ consumer = await self._client.subscribe(topic, sub)
+ await consumer.unsubscribe()
+ consumer = await self._client.subscribe(topic, sub)
Review Comment:
Variable consumer is not used.
```suggestion
consumer = await self._client.subscribe(topic, sub)
await consumer.close()
```
##########
pulsar/asyncio.py:
##########
@@ -145,12 +401,239 @@ async def create_producer(self, topic: str) -> Producer:
------
PulsarException
"""
+ if schema is None:
+ schema = pulsar.schema.BytesSchema()
+
future = asyncio.get_running_loop().create_future()
conf = _pulsar.ProducerConfiguration()
- # TODO: add more configs
- self._client.create_producer_async(topic, conf,
functools.partial(_set_future, future))
+ conf.send_timeout_millis(send_timeout_millis)
+ conf.compression_type(compression_type)
+ conf.max_pending_messages(max_pending_messages)
+
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+ conf.block_if_queue_full(block_if_queue_full)
+ conf.batching_enabled(batching_enabled)
+ conf.batching_max_messages(batching_max_messages)
+
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+ conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+ conf.partitions_routing_mode(message_routing_mode)
+ conf.batching_type(batching_type)
+ conf.chunking_enabled(chunking_enabled)
+ conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+ conf.access_mode(access_mode)
+ if message_router is not None:
+ def underlying_router(msg, num_partitions):
+ return int(message_router(pulsar.Message._wrap(msg),
+ num_partitions))
+ conf.message_router(underlying_router)
+
+ if producer_name:
+ conf.producer_name(producer_name)
+ if initial_sequence_id is not None:
+ conf.initial_sequence_id(initial_sequence_id)
Review Comment:
The conditional check for initial_sequence_id on line 431 should use "is not
None" instead of just checking truthiness. With the current implementation, if
a user passes initial_sequence_id=0, it would be treated as falsy and not set,
which is incorrect. This matches the pattern used in the sync version (line 870
of pulsar/__init__.py) which also has this issue, but should still be fixed
here.
##########
src/consumer.cc:
##########
@@ -89,21 +90,66 @@ void Consumer_seek(Consumer& consumer, const MessageId&
msgId) {
waitForAsyncResult([msgId, &consumer](ResultCallback callback) {
consumer.seekAsync(msgId, callback); });
}
+MessageId Consumer_get_last_message_id(Consumer& consumer) {
+ MessageId msgId;
+ Result res;
+ Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId);
+ Py_END_ALLOW_THREADS CHECK_RESULT(res);
Review Comment:
Missing line break after Py_END_ALLOW_THREADS. The code has
"Py_END_ALLOW_THREADS CHECK_RESULT(res);" on the same line, which is
inconsistent with the common pattern used elsewhere in the codebase where these
are typically on separate lines for better readability.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]