Copilot commented on code in PR #277:
URL:
https://github.com/apache/pulsar-client-python/pull/277#discussion_r2643288652
##########
pulsar/asyncio.py:
##########
@@ -116,6 +128,177 @@ async def close(self) -> None:
self._producer.close_async(functools.partial(_set_future, future,
value=None))
await future
+class Consumer:
+ """
+ The Pulsar message consumer, used to subscribe to messages from a topic.
+ """
+
+ def __init__(self, consumer: _pulsar.Consumer, schema:
pulsar.schema.Schema) -> None:
+ """
+ Create the consumer.
+ Users should not call this constructor directly. Instead, create the
+ consumer via `Client.subscribe`.
+
+ Parameters
+ ----------
+ consumer: _pulsar.Consumer
+ The underlying Consumer object from the C extension.
+ schema: pulsar.schema.Schema
+ The schema of the data that will be received by this consumer.
+ """
+ self._consumer = consumer
+ self._schema = schema
+
+ async def receive(self) -> pulsar.Message:
+ """
+ Receive a single message asynchronously.
+
+ Returns
+ -------
+ pulsar.Message
+ The message received.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._consumer.receive_async(functools.partial(_set_future, future))
+ msg = await future
+ m = pulsar.Message()
+ m._message = msg
+ m._schema = self._schema
+ return m
+
+ async def acknowledge(
+ self,
+ message: Union[pulsar.Message, pulsar.MessageId,
+ _pulsar.Message, _pulsar.MessageId]
+ ) -> None:
+ """
+ Acknowledge the reception of a single message asynchronously.
+
+ Parameters
+ ----------
+ message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+ The received message or message id.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ if isinstance(message, pulsar.Message):
+ msg = message._message
+ elif isinstance(message, pulsar.MessageId):
+ msg = message._msg_id
+ else:
+ msg = message
+ self._consumer.acknowledge_async(msg, functools.partial(_set_future,
future, value=None))
+ await future
+
+ async def acknowledge_cumulative(
+ self,
+ message: Union[pulsar.Message, pulsar.MessageId,
+ _pulsar.Message, _pulsar.MessageId]
+ ) -> None:
+ """
+ Acknowledge the reception of all the messages in the stream up to (and
+ including) the provided message asynchronously.
+
+ Parameters
+ ----------
+ message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+ The received message or message id.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ if isinstance(message, pulsar.Message):
+ msg = message._message
+ elif isinstance(message, pulsar.MessageId):
+ msg = message._msg_id
+ else:
+ msg = message
+ self._consumer.acknowledge_cumulative_async(
+ msg, functools.partial(_set_future, future, value=None)
+ )
+ await future
+
+ async def unsubscribe(self) -> None:
+ """
+ Unsubscribe the current consumer from the topic asynchronously.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._consumer.unsubscribe_async(functools.partial(_set_future,
future, value=None))
+ await future
+
+ async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None:
+ """
+ Reset the subscription associated with this consumer to a specific
+ message id or publish timestamp asynchronously.
+
+ The message id can either be a specific message or represent the first
+ or last messages in the topic.
+
+ Parameters
+ ----------
+ messageid : MessageId or int
+ The message id for seek, OR an integer event time (timestamp) to
+ seek to
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ if isinstance(messageid, pulsar.MessageId):
+ msg_id = messageid._msg_id
+ elif isinstance(messageid, int):
+ msg_id = messageid
+ else:
+ raise ValueError(f"invalid messageid type {type(messageid)}")
+ self._consumer.seek_async(
+ msg_id, functools.partial(_set_future, future, value=None)
+ )
+ await future
+
+ async def close(self) -> None:
+ """
+ Close the consumer asynchronously.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._consumer.close_async(functools.partial(_set_future, future,
value=None))
+ await future
+
+ def topic(self) -> str:
+ """
+ Return the topic this consumer is subscribed to.
+ """
+ return self._consumer.topic()
+
+ def subscription_name(self) -> str:
+ """
+ Return the subscription name.
+ """
+ return self._consumer.subscription_name()
+
+ def consumer_name(self) -> str:
+ """
+ Return the consumer name.
+ """
+ return self._consumer.consumer_name()
Review Comment:
The async Consumer class is missing negative acknowledgement functionality.
The synchronous Consumer has `negative_acknowledge` methods (lines 171-172 in
consumer.cc), but there's no async equivalent exposed. This means async
consumers cannot use negative acknowledgements to trigger message redelivery,
which is an important feature available in the synchronous API. Consider adding
`negative_acknowledge_async` methods in consumer.cc and exposing them through
the Python async Consumer class to maintain feature parity.
##########
pulsar/asyncio.py:
##########
@@ -145,12 +407,238 @@ async def create_producer(self, topic: str) -> Producer:
------
PulsarException
"""
+ if schema is None:
+ schema = pulsar.schema.BytesSchema()
+
future = asyncio.get_running_loop().create_future()
conf = _pulsar.ProducerConfiguration()
- # TODO: add more configs
- self._client.create_producer_async(topic, conf,
functools.partial(_set_future, future))
+ conf.send_timeout_millis(send_timeout_millis)
+ conf.compression_type(compression_type)
+ conf.max_pending_messages(max_pending_messages)
+
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+ conf.block_if_queue_full(block_if_queue_full)
+ conf.batching_enabled(batching_enabled)
+ conf.batching_max_messages(batching_max_messages)
+
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+ conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+ conf.partitions_routing_mode(message_routing_mode)
+ conf.batching_type(batching_type)
+ conf.chunking_enabled(chunking_enabled)
+ conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+ conf.access_mode(access_mode)
+ if message_router is not None:
+ def underlying_router(msg, num_partitions):
+ return int(message_router(pulsar.Message._wrap(msg),
+ num_partitions))
+ conf.message_router(underlying_router)
+
+ if producer_name:
+ conf.producer_name(producer_name)
+ if initial_sequence_id is not None:
+ conf.initial_sequence_id(initial_sequence_id)
+ if properties:
+ for k, v in properties.items():
+ conf.property(k, v)
+
+ conf.schema(schema.schema_info())
+ if encryption_key:
+ conf.encryption_key(encryption_key)
+ if crypto_key_reader:
+ conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+ if batching_enabled and chunking_enabled:
+ raise ValueError(
+ "Batching and chunking of messages can't be enabled together."
+ )
+
+ self._client.create_producer_async(
+ topic, conf, functools.partial(_set_future, future)
+ )
return Producer(await future)
Review Comment:
The Producer is instantiated without passing the schema. The Producer needs
to know which schema to use for encoding messages in the `send()` method.
Modify the instantiation to pass the schema: `return Producer(await future,
schema)` and update the Producer's `__init__` to accept and store the schema
parameter.
##########
pulsar/asyncio.py:
##########
@@ -116,6 +128,177 @@ async def close(self) -> None:
self._producer.close_async(functools.partial(_set_future, future,
value=None))
await future
+class Consumer:
+ """
+ The Pulsar message consumer, used to subscribe to messages from a topic.
+ """
+
+ def __init__(self, consumer: _pulsar.Consumer, schema:
pulsar.schema.Schema) -> None:
+ """
+ Create the consumer.
+ Users should not call this constructor directly. Instead, create the
+ consumer via `Client.subscribe`.
+
+ Parameters
+ ----------
+ consumer: _pulsar.Consumer
+ The underlying Consumer object from the C extension.
+ schema: pulsar.schema.Schema
+ The schema of the data that will be received by this consumer.
+ """
+ self._consumer = consumer
+ self._schema = schema
+
+ async def receive(self) -> pulsar.Message:
+ """
+ Receive a single message asynchronously.
+
+ Returns
+ -------
+ pulsar.Message
+ The message received.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._consumer.receive_async(functools.partial(_set_future, future))
+ msg = await future
+ m = pulsar.Message()
+ m._message = msg
+ m._schema = self._schema
+ return m
+
+ async def acknowledge(
+ self,
+ message: Union[pulsar.Message, pulsar.MessageId,
+ _pulsar.Message, _pulsar.MessageId]
+ ) -> None:
+ """
+ Acknowledge the reception of a single message asynchronously.
+
+ Parameters
+ ----------
+ message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+ The received message or message id.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ if isinstance(message, pulsar.Message):
+ msg = message._message
+ elif isinstance(message, pulsar.MessageId):
+ msg = message._msg_id
+ else:
+ msg = message
+ self._consumer.acknowledge_async(msg, functools.partial(_set_future,
future, value=None))
+ await future
+
+ async def acknowledge_cumulative(
+ self,
+ message: Union[pulsar.Message, pulsar.MessageId,
+ _pulsar.Message, _pulsar.MessageId]
+ ) -> None:
+ """
+ Acknowledge the reception of all the messages in the stream up to (and
+ including) the provided message asynchronously.
+
+ Parameters
+ ----------
+ message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+ The received message or message id.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ if isinstance(message, pulsar.Message):
+ msg = message._message
+ elif isinstance(message, pulsar.MessageId):
+ msg = message._msg_id
+ else:
+ msg = message
+ self._consumer.acknowledge_cumulative_async(
+ msg, functools.partial(_set_future, future, value=None)
+ )
+ await future
+
+ async def unsubscribe(self) -> None:
+ """
+ Unsubscribe the current consumer from the topic asynchronously.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._consumer.unsubscribe_async(functools.partial(_set_future,
future, value=None))
+ await future
+
+ async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None:
+ """
+ Reset the subscription associated with this consumer to a specific
+ message id or publish timestamp asynchronously.
+
+ The message id can either be a specific message or represent the first
+ or last messages in the topic.
+
+ Parameters
+ ----------
+ messageid : MessageId or int
+ The message id for seek, OR an integer event time (timestamp) to
+ seek to
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ if isinstance(messageid, pulsar.MessageId):
+ msg_id = messageid._msg_id
+ elif isinstance(messageid, int):
+ msg_id = messageid
+ else:
+ raise ValueError(f"invalid messageid type {type(messageid)}")
+ self._consumer.seek_async(
+ msg_id, functools.partial(_set_future, future, value=None)
+ )
+ await future
+
+ async def close(self) -> None:
+ """
+ Close the consumer asynchronously.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._consumer.close_async(functools.partial(_set_future, future,
value=None))
+ await future
+
+ def topic(self) -> str:
+ """
+ Return the topic this consumer is subscribed to.
+ """
+ return self._consumer.topic()
+
+ def subscription_name(self) -> str:
+ """
+ Return the subscription name.
+ """
+ return self._consumer.subscription_name()
+
+ def consumer_name(self) -> str:
+ """
+ Return the consumer name.
+ """
+ return self._consumer.consumer_name()
+
Review Comment:
The async Consumer class is missing several utility methods available in the
synchronous Consumer:
1. `is_connected()` - to check connection status
2. `get_last_message_id()` - to get the last message ID
3. `redeliver_unacknowledged_messages()` - to trigger redelivery of unacked
messages
These are important utility methods. Consider adding them to provide feature
parity with the synchronous API.
```suggestion
def is_connected(self) -> bool:
"""
Return whether the consumer is currently connected to the broker.
"""
return self._consumer.is_connected()
def get_last_message_id(self) -> pulsar.MessageId:
"""
Get the last message id available for this topic.
Returns
-------
MessageId
The last message id available for this topic.
"""
return pulsar.MessageId(self._consumer.get_last_message_id())
def redeliver_unacknowledged_messages(
self, message_ids: List[pulsar.MessageId] | None = None
) -> None:
"""
Redeliver one or all unacknowledged messages.
If `message_ids` is None, all unacknowledged messages are marked for
redelivery. Otherwise, only the specified messages are redelivered.
Parameters
----------
message_ids : list[MessageId], optional
The specific messages to redeliver.
"""
if message_ids is None:
self._consumer.redeliver_unacknowledged_messages()
else:
self._consumer.redeliver_unacknowledged_messages(
[msg._msg_id for msg in message_ids]
)
```
##########
tests/asyncio_test.py:
##########
@@ -58,29 +71,199 @@ async def test_batch_send(self):
self.assertEqual(msg_ids[i].entry_id(), entry_id)
self.assertEqual(msg_ids[i].batch_index(), i)
+ consumer = await self._client.subscribe(topic, 'sub',
+
initial_position=pulsar.InitialPosition.Earliest)
+ for i in range(5):
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), f'msg-{i}'.encode())
+ await consumer.close()
+
+ # create a different subscription to verify initial position is latest
by default
+ consumer = await self._client.subscribe(topic, 'sub2')
+ await producer.send(b'final-message')
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'final-message')
+
async def test_create_producer_failure(self):
try:
- await
self._client.create_producer('tenant/ns/awaitio-test-send-failure')
+ await
self._client.create_producer('tenant/ns/asyncio-test-send-failure')
self.fail()
except PulsarException as e:
self.assertEqual(e.error(), pulsar.Result.Timeout)
async def test_send_failure(self):
- producer = await
self._client.create_producer('awaitio-test-send-failure')
+ producer = await
self._client.create_producer('asyncio-test-send-failure')
try:
await producer.send(('x' * 1024 * 1024 * 10).encode())
self.fail()
except PulsarException as e:
self.assertEqual(e.error(), pulsar.Result.MessageTooBig)
async def test_close_producer(self):
- producer = await
self._client.create_producer('awaitio-test-close-producer')
+ producer = await
self._client.create_producer('asyncio-test-close-producer')
await producer.close()
try:
await producer.close()
self.fail()
except PulsarException as e:
self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
+ async def _prepare_messages(self, producer: Producer) ->
List[pulsar.MessageId]:
+ msg_ids = []
+ for i in range(5):
+ msg_ids.append(await producer.send(f'msg-{i}'.encode()))
+ return msg_ids
+
+ async def test_consumer_cumulative_acknowledge(self):
+ topic = f'asyncio-test-consumer-cumulative-ack-{time.time()}'
+ sub = 'sub'
+ consumer = await self._client.subscribe(topic, sub)
+ producer = await self._client.create_producer(topic)
+ await self._prepare_messages(producer)
+ last_msg = None
+ for _ in range(5):
+ last_msg = await consumer.receive()
+ await consumer.acknowledge_cumulative(last_msg)
+ await consumer.close()
+
+ consumer = await self._client.subscribe(topic, sub)
+ await producer.send(b'final-message')
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'final-message')
+
+ async def test_consumer_individual_acknowledge(self):
+ topic = f'asyncio-test-consumer-individual-ack-{time.time()}'
+ sub = 'sub'
+ consumer = await self._client.subscribe(topic, sub,
+
consumer_type=pulsar.ConsumerType.Shared)
+ producer = await self._client.create_producer(topic)
+ await self._prepare_messages(producer)
+ msgs = []
+ for _ in range(5):
+ msg = await consumer.receive()
+ msgs.append(msg)
+
+ await consumer.acknowledge(msgs[0])
+ await consumer.acknowledge(msgs[2])
+ await consumer.acknowledge(msgs[4])
+ await consumer.close()
+
+ consumer = await self._client.subscribe(topic, sub,
+
consumer_type=pulsar.ConsumerType.Shared)
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'msg-1')
+ msg = await consumer.receive()
+ self.assertEqual(msg.data(), b'msg-3')
+
+ async def test_multi_topic_consumer(self):
+ topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2']
+ producers = []
+
+ for topic in topics:
+ producer = await self._client.create_producer(topic)
+ producers.append(producer)
+
+ consumer = await self._client.subscribe(topics,
'test-multi-subscription')
+
+ await producers[0].send(b'message-from-topic-1')
+ await producers[1].send(b'message-from-topic-2')
+
+ async def verify_receive(consumer: Consumer):
+ received_messages = {}
+ for _ in range(2):
+ msg = await consumer.receive()
+ received_messages[msg.data()] = None
+ await consumer.acknowledge(msg.message_id())
+ self.assertEqual(received_messages, {
+ b'message-from-topic-1': None,
+ b'message-from-topic-2': None
+ })
+
+ await verify_receive(consumer)
+ await consumer.close()
+
+ consumer = await
self._client.subscribe('public/default/asyncio-test-multi-topic-.*',
+ 'test-multi-subscription-2',
+ is_pattern_topic=True,
+
initial_position=pulsar.InitialPosition.Earliest)
Review Comment:
The test relies on previously sent messages being available when creating a
pattern subscription consumer. This could be flaky if messages are expired or
if there's a timing issue. The pattern subscription at line 185 uses
`initial_position=pulsar.InitialPosition.Earliest` which is correct, but for
robustness, consider sending fresh messages after creating the pattern consumer
to verify it works correctly, rather than relying on messages sent to the first
consumer.
```suggestion
initial_position=pulsar.InitialPosition.Earliest)
await producers[0].send(b'message-from-topic-1')
await producers[1].send(b'message-from-topic-2')
```
##########
pulsar/asyncio.py:
##########
@@ -116,6 +128,177 @@ async def close(self) -> None:
self._producer.close_async(functools.partial(_set_future, future,
value=None))
await future
+class Consumer:
+ """
+ The Pulsar message consumer, used to subscribe to messages from a topic.
+ """
+
+ def __init__(self, consumer: _pulsar.Consumer, schema:
pulsar.schema.Schema) -> None:
+ """
+ Create the consumer.
+ Users should not call this constructor directly. Instead, create the
+ consumer via `Client.subscribe`.
+
+ Parameters
+ ----------
+ consumer: _pulsar.Consumer
+ The underlying Consumer object from the C extension.
+ schema: pulsar.schema.Schema
+ The schema of the data that will be received by this consumer.
+ """
+ self._consumer = consumer
+ self._schema = schema
+
+ async def receive(self) -> pulsar.Message:
+ """
+ Receive a single message asynchronously.
+
+ Returns
+ -------
+ pulsar.Message
+ The message received.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._consumer.receive_async(functools.partial(_set_future, future))
+ msg = await future
+ m = pulsar.Message()
+ m._message = msg
+ m._schema = self._schema
+ return m
+
+ async def acknowledge(
+ self,
+ message: Union[pulsar.Message, pulsar.MessageId,
+ _pulsar.Message, _pulsar.MessageId]
+ ) -> None:
+ """
+ Acknowledge the reception of a single message asynchronously.
+
+ Parameters
+ ----------
+ message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+ The received message or message id.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ if isinstance(message, pulsar.Message):
+ msg = message._message
+ elif isinstance(message, pulsar.MessageId):
+ msg = message._msg_id
+ else:
+ msg = message
+ self._consumer.acknowledge_async(msg, functools.partial(_set_future,
future, value=None))
+ await future
+
+ async def acknowledge_cumulative(
+ self,
+ message: Union[pulsar.Message, pulsar.MessageId,
+ _pulsar.Message, _pulsar.MessageId]
+ ) -> None:
+ """
+ Acknowledge the reception of all the messages in the stream up to (and
+ including) the provided message asynchronously.
+
+ Parameters
+ ----------
+ message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+ The received message or message id.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ if isinstance(message, pulsar.Message):
+ msg = message._message
+ elif isinstance(message, pulsar.MessageId):
+ msg = message._msg_id
+ else:
+ msg = message
+ self._consumer.acknowledge_cumulative_async(
+ msg, functools.partial(_set_future, future, value=None)
+ )
+ await future
+
+ async def unsubscribe(self) -> None:
+ """
+ Unsubscribe the current consumer from the topic asynchronously.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._consumer.unsubscribe_async(functools.partial(_set_future,
future, value=None))
+ await future
+
+ async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None:
+ """
+ Reset the subscription associated with this consumer to a specific
+ message id or publish timestamp asynchronously.
+
+ The message id can either be a specific message or represent the first
+ or last messages in the topic.
+
+ Parameters
+ ----------
+ messageid : MessageId or int
+ The message id for seek, OR an integer event time (timestamp) to
+ seek to
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ if isinstance(messageid, pulsar.MessageId):
+ msg_id = messageid._msg_id
+ elif isinstance(messageid, int):
+ msg_id = messageid
+ else:
+ raise ValueError(f"invalid messageid type {type(messageid)}")
+ self._consumer.seek_async(
+ msg_id, functools.partial(_set_future, future, value=None)
+ )
+ await future
+
+ async def close(self) -> None:
+ """
+ Close the consumer asynchronously.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._consumer.close_async(functools.partial(_set_future, future,
value=None))
+ await future
+
+ def topic(self) -> str:
+ """
+ Return the topic this consumer is subscribed to.
+ """
+ return self._consumer.topic()
+
+ def subscription_name(self) -> str:
+ """
+ Return the subscription name.
+ """
+ return self._consumer.subscription_name()
+
+ def consumer_name(self) -> str:
+ """
+ Return the consumer name.
+ """
+ return self._consumer.consumer_name()
Review Comment:
The new async consumer functionality lacks test coverage for several
important features that are tested in the synchronous version:
1. Error handling for acknowledge operations (e.g., acknowledging already
acknowledged messages)
2. Negative acknowledgement functionality
3. Dead letter policy behavior
4. Batch receive operations
5. Consumer with key-shared policy
6. Error conditions when receiving messages
Consider adding test cases for these scenarios to ensure the async consumer
behaves consistently with the synchronous version.
##########
pulsar/asyncio.py:
##########
@@ -116,6 +128,177 @@ async def close(self) -> None:
self._producer.close_async(functools.partial(_set_future, future,
value=None))
await future
Review Comment:
The async Producer class is missing several methods that are available in
the synchronous Producer class:
1. `flush()` - to flush buffered messages
2. `producer_name()` - to get the producer name
3. `topic()` - to get the topic name
4. `last_sequence_id()` - to get the last sequence ID
5. `is_connected()` - to check connection status
These methods are non-blocking getters or have synchronous implementations
in the C++ library. Consider adding them to maintain API parity with the
synchronous Producer, especially `flush()` which is important for ensuring
message delivery before closing.
```suggestion
async def flush(self) -> None:
"""
Flush all buffered messages.
This method ensures that all messages previously passed to ``send``
are
flushed to the broker before it returns.
Raises
------
PulsarException
"""
# The underlying C++ flush implementation is synchronous and
non-blocking,
# so it is safe to call it directly from this async wrapper.
self._producer.flush()
def producer_name(self) -> str:
"""
Get the name of the producer.
Returns
-------
str
The producer name.
"""
return self._producer.producer_name()
def topic(self) -> str:
"""
Get the topic this producer is publishing to.
Returns
-------
str
The topic name.
"""
return self._producer.topic()
def last_sequence_id(self) -> int:
"""
Get the last sequence id that was published by this producer.
Returns
-------
int
The last published sequence id.
"""
return self._producer.last_sequence_id()
def is_connected(self) -> bool:
"""
Check whether the producer is currently connected to the broker.
Returns
-------
bool
True if the producer is connected, False otherwise.
"""
return self._producer.is_connected()
```
##########
pulsar/asyncio.py:
##########
@@ -116,6 +128,177 @@ async def close(self) -> None:
self._producer.close_async(functools.partial(_set_future, future,
value=None))
await future
+class Consumer:
+ """
+ The Pulsar message consumer, used to subscribe to messages from a topic.
+ """
+
+ def __init__(self, consumer: _pulsar.Consumer, schema:
pulsar.schema.Schema) -> None:
+ """
+ Create the consumer.
+ Users should not call this constructor directly. Instead, create the
+ consumer via `Client.subscribe`.
+
+ Parameters
+ ----------
+ consumer: _pulsar.Consumer
+ The underlying Consumer object from the C extension.
+ schema: pulsar.schema.Schema
+ The schema of the data that will be received by this consumer.
+ """
+ self._consumer = consumer
+ self._schema = schema
+
+ async def receive(self) -> pulsar.Message:
+ """
+ Receive a single message asynchronously.
+
+ Returns
+ -------
+ pulsar.Message
+ The message received.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._consumer.receive_async(functools.partial(_set_future, future))
+ msg = await future
+ m = pulsar.Message()
+ m._message = msg
+ m._schema = self._schema
+ return m
+
+ async def acknowledge(
+ self,
+ message: Union[pulsar.Message, pulsar.MessageId,
+ _pulsar.Message, _pulsar.MessageId]
+ ) -> None:
+ """
+ Acknowledge the reception of a single message asynchronously.
+
+ Parameters
+ ----------
+ message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+ The received message or message id.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ if isinstance(message, pulsar.Message):
+ msg = message._message
+ elif isinstance(message, pulsar.MessageId):
+ msg = message._msg_id
+ else:
+ msg = message
+ self._consumer.acknowledge_async(msg, functools.partial(_set_future,
future, value=None))
+ await future
+
+ async def acknowledge_cumulative(
+ self,
+ message: Union[pulsar.Message, pulsar.MessageId,
+ _pulsar.Message, _pulsar.MessageId]
+ ) -> None:
+ """
+ Acknowledge the reception of all the messages in the stream up to (and
+ including) the provided message asynchronously.
+
+ Parameters
+ ----------
+ message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+ The received message or message id.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ if isinstance(message, pulsar.Message):
+ msg = message._message
+ elif isinstance(message, pulsar.MessageId):
+ msg = message._msg_id
+ else:
+ msg = message
+ self._consumer.acknowledge_cumulative_async(
+ msg, functools.partial(_set_future, future, value=None)
+ )
+ await future
+
+ async def unsubscribe(self) -> None:
+ """
+ Unsubscribe the current consumer from the topic asynchronously.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._consumer.unsubscribe_async(functools.partial(_set_future,
future, value=None))
+ await future
+
+ async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None:
+ """
+ Reset the subscription associated with this consumer to a specific
+ message id or publish timestamp asynchronously.
+
+ The message id can either be a specific message or represent the first
+ or last messages in the topic.
+
+ Parameters
+ ----------
+ messageid : MessageId or int
+ The message id for seek, OR an integer event time (timestamp) to
+ seek to
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ if isinstance(messageid, pulsar.MessageId):
+ msg_id = messageid._msg_id
+ elif isinstance(messageid, int):
+ msg_id = messageid
+ else:
+ raise ValueError(f"invalid messageid type {type(messageid)}")
+ self._consumer.seek_async(
+ msg_id, functools.partial(_set_future, future, value=None)
+ )
+ await future
+
+ async def close(self) -> None:
+ """
+ Close the consumer asynchronously.
+
+ Raises
+ ------
+ PulsarException
+ """
+ future = asyncio.get_running_loop().create_future()
+ self._consumer.close_async(functools.partial(_set_future, future,
value=None))
+ await future
+
+ def topic(self) -> str:
+ """
+ Return the topic this consumer is subscribed to.
+ """
+ return self._consumer.topic()
+
+ def subscription_name(self) -> str:
+ """
+ Return the subscription name.
+ """
+ return self._consumer.subscription_name()
+
+ def consumer_name(self) -> str:
+ """
+ Return the consumer name.
+ """
+ return self._consumer.consumer_name()
Review Comment:
The async Consumer class is missing the `batch_receive()` method that is
available in the synchronous Consumer. Batch receiving is an important feature
for high-throughput scenarios where applications want to receive multiple
messages at once to reduce processing overhead. Consider adding an async
`batch_receive()` method to maintain feature parity with the synchronous API.
##########
pulsar/asyncio.py:
##########
@@ -145,12 +407,238 @@ async def create_producer(self, topic: str) -> Producer:
------
PulsarException
"""
+ if schema is None:
+ schema = pulsar.schema.BytesSchema()
+
future = asyncio.get_running_loop().create_future()
conf = _pulsar.ProducerConfiguration()
- # TODO: add more configs
- self._client.create_producer_async(topic, conf,
functools.partial(_set_future, future))
+ conf.send_timeout_millis(send_timeout_millis)
+ conf.compression_type(compression_type)
+ conf.max_pending_messages(max_pending_messages)
+
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+ conf.block_if_queue_full(block_if_queue_full)
+ conf.batching_enabled(batching_enabled)
+ conf.batching_max_messages(batching_max_messages)
+
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+ conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+ conf.partitions_routing_mode(message_routing_mode)
+ conf.batching_type(batching_type)
+ conf.chunking_enabled(chunking_enabled)
+ conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+ conf.access_mode(access_mode)
+ if message_router is not None:
+ def underlying_router(msg, num_partitions):
+ return int(message_router(pulsar.Message._wrap(msg),
+ num_partitions))
+ conf.message_router(underlying_router)
+
+ if producer_name:
+ conf.producer_name(producer_name)
+ if initial_sequence_id is not None:
+ conf.initial_sequence_id(initial_sequence_id)
+ if properties:
+ for k, v in properties.items():
+ conf.property(k, v)
+
+ conf.schema(schema.schema_info())
+ if encryption_key:
+ conf.encryption_key(encryption_key)
+ if crypto_key_reader:
+ conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+ if batching_enabled and chunking_enabled:
+ raise ValueError(
+ "Batching and chunking of messages can't be enabled together."
+ )
Review Comment:
The validation for batching and chunking conflict (lines 449-452) occurs
after the configuration has already been set. This means that if both are
enabled, the configuration object will be partially configured before the
ValueError is raised. Consider moving this validation to the beginning of the
method, right after the schema initialization, to fail fast before any
configuration is applied.
##########
pulsar/asyncio.py:
##########
@@ -145,12 +407,238 @@ async def create_producer(self, topic: str) -> Producer:
------
PulsarException
"""
+ if schema is None:
+ schema = pulsar.schema.BytesSchema()
+
future = asyncio.get_running_loop().create_future()
conf = _pulsar.ProducerConfiguration()
- # TODO: add more configs
- self._client.create_producer_async(topic, conf,
functools.partial(_set_future, future))
+ conf.send_timeout_millis(send_timeout_millis)
+ conf.compression_type(compression_type)
+ conf.max_pending_messages(max_pending_messages)
+
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+ conf.block_if_queue_full(block_if_queue_full)
+ conf.batching_enabled(batching_enabled)
+ conf.batching_max_messages(batching_max_messages)
+
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+ conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+ conf.partitions_routing_mode(message_routing_mode)
+ conf.batching_type(batching_type)
+ conf.chunking_enabled(chunking_enabled)
+ conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+ conf.access_mode(access_mode)
+ if message_router is not None:
+ def underlying_router(msg, num_partitions):
+ return int(message_router(pulsar.Message._wrap(msg),
+ num_partitions))
+ conf.message_router(underlying_router)
+
+ if producer_name:
+ conf.producer_name(producer_name)
+ if initial_sequence_id is not None:
+ conf.initial_sequence_id(initial_sequence_id)
+ if properties:
+ for k, v in properties.items():
+ conf.property(k, v)
+
+ conf.schema(schema.schema_info())
+ if encryption_key:
+ conf.encryption_key(encryption_key)
+ if crypto_key_reader:
+ conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+ if batching_enabled and chunking_enabled:
+ raise ValueError(
+ "Batching and chunking of messages can't be enabled together."
+ )
+
+ self._client.create_producer_async(
+ topic, conf, functools.partial(_set_future, future)
+ )
return Producer(await future)
+ # pylint:
disable=too-many-arguments,too-many-locals,too-many-branches,too-many-positional-arguments
+ async def subscribe(self, topic: Union[str, List[str]],
+ subscription_name: str,
+ consumer_type: pulsar.ConsumerType =
+ pulsar.ConsumerType.Exclusive,
+ schema: pulsar.schema.Schema | None = None,
+ receiver_queue_size: int = 1000,
+ max_total_receiver_queue_size_across_partitions: int =
+ 50000,
+ consumer_name: str | None = None,
+ unacked_messages_timeout_ms: int | None = None,
+ broker_consumer_stats_cache_time_ms: int = 30000,
+ negative_ack_redelivery_delay_ms: int = 60000,
+ is_read_compacted: bool = False,
+ properties: dict | None = None,
+ initial_position: InitialPosition =
InitialPosition.Latest,
+ crypto_key_reader: pulsar.CryptoKeyReader | None =
None,
+ replicate_subscription_state_enabled: bool = False,
+ max_pending_chunked_message: int = 10,
+ auto_ack_oldest_chunked_message_on_queue_full: bool =
False,
+ start_message_id_inclusive: bool = False,
+ batch_receive_policy:
pulsar.ConsumerBatchReceivePolicy | None =
+ None,
+ key_shared_policy: pulsar.ConsumerKeySharedPolicy |
None =
+ None,
+ batch_index_ack_enabled: bool = False,
+ regex_subscription_mode: RegexSubscriptionMode =
+ RegexSubscriptionMode.PersistentOnly,
+ dead_letter_policy: pulsar.ConsumerDeadLetterPolicy |
None =
+ None,
+ crypto_failure_action: ConsumerCryptoFailureAction =
+ ConsumerCryptoFailureAction.FAIL,
+ is_pattern_topic: bool = False) -> Consumer:
+ """
+ Subscribe to the given topic and subscription combination.
+
+ Parameters
+ ----------
+ topic: str, List[str], or regex pattern
+ The name of the topic, list of topics or regex pattern.
+ When `is_pattern_topic` is True, `topic` is treated as a regex.
+ subscription_name: str
+ The name of the subscription.
+ consumer_type: pulsar.ConsumerType,
default=pulsar.ConsumerType.Exclusive
+ Select the subscription type to be used when subscribing to the
topic.
+ schema: pulsar.schema.Schema | None, default=None
+ Define the schema of the data that will be received by this
consumer.
+ receiver_queue_size: int, default=1000
+ Sets the size of the consumer receive queue.
+ max_total_receiver_queue_size_across_partitions: int, default=50000
+ Set the max total receiver queue size across partitions.
+ consumer_name: str | None, default=None
+ Sets the consumer name.
+ unacked_messages_timeout_ms: int | None, default=None
+ Sets the timeout in milliseconds for unacknowledged messages.
+ broker_consumer_stats_cache_time_ms: int, default=30000
+ Sets the time duration for which the broker-side consumer stats
+ will be cached in the client.
+ negative_ack_redelivery_delay_ms: int, default=60000
+ The delay after which to redeliver the messages that failed to be
+ processed.
+ is_read_compacted: bool, default=False
+ Selects whether to read the compacted version of the topic.
+ properties: dict | None, default=None
+ Sets the properties for the consumer.
+ initial_position: InitialPosition, default=InitialPosition.Latest
+ Set the initial position of a consumer when subscribing to the
topic.
+ crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
+ Symmetric encryption class implementation.
+ replicate_subscription_state_enabled: bool, default=False
+ Set whether the subscription status should be replicated.
+ max_pending_chunked_message: int, default=10
+ Consumer buffers chunk messages into memory until it receives all
the chunks.
+ auto_ack_oldest_chunked_message_on_queue_full: bool, default=False
+ Automatically acknowledge oldest chunked messages on queue
+ full.
+ start_message_id_inclusive: bool, default=False
+ Set the consumer to include the given position of any reset
+ operation.
+ batch_receive_policy: pulsar.ConsumerBatchReceivePolicy | None,
default=None
+ Set the batch collection policy for batch receiving.
+ key_shared_policy: pulsar.ConsumerKeySharedPolicy | None, default=None
+ Set the key shared policy for use when the ConsumerType is
+ KeyShared.
+ batch_index_ack_enabled: bool, default=False
+ Enable the batch index acknowledgement.
+ regex_subscription_mode: RegexSubscriptionMode,
+ default=RegexSubscriptionMode.PersistentOnly
+ Set the regex subscription mode for use when the topic is a regex
+ pattern.
+ dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | None,
default=None
+ Set dead letter policy for consumer.
+ crypto_failure_action: ConsumerCryptoFailureAction,
+ default=ConsumerCryptoFailureAction.FAIL
+ Set the behavior when the decryption fails.
+ is_pattern_topic: bool, default=False
+ Whether `topic` is a regex pattern. If it's True when `topic` is a
list, a ValueError
+ will be raised.
+
+ Returns
+ -------
+ Consumer
+ The consumer created
+
+ Raises
+ ------
+ PulsarException
+ """
+ if schema is None:
+ schema = pulsar.schema.BytesSchema()
+
+ future = asyncio.get_running_loop().create_future()
+ conf = _pulsar.ConsumerConfiguration()
+ conf.consumer_type(consumer_type)
+ conf.regex_subscription_mode(regex_subscription_mode)
+ conf.read_compacted(is_read_compacted)
+ conf.receiver_queue_size(receiver_queue_size)
+ conf.max_total_receiver_queue_size_across_partitions(
+ max_total_receiver_queue_size_across_partitions
+ )
+ if consumer_name:
+ conf.consumer_name(consumer_name)
+ if unacked_messages_timeout_ms:
+ conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
+
+ conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
+
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
+ if properties:
+ for k, v in properties.items():
+ conf.property(k, v)
+ conf.subscription_initial_position(initial_position)
+
+ conf.schema(schema.schema_info())
+
+ if crypto_key_reader:
+ conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+
conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
+ conf.max_pending_chunked_message(max_pending_chunked_message)
+ conf.auto_ack_oldest_chunked_message_on_queue_full(
+ auto_ack_oldest_chunked_message_on_queue_full
+ )
+ conf.start_message_id_inclusive(start_message_id_inclusive)
+ if batch_receive_policy:
+ conf.batch_receive_policy(batch_receive_policy.policy())
+
+ if key_shared_policy:
+ conf.key_shared_policy(key_shared_policy.policy())
+ conf.batch_index_ack_enabled(batch_index_ack_enabled)
+ if dead_letter_policy:
+ conf.dead_letter_policy(dead_letter_policy.policy())
+ conf.crypto_failure_action(crypto_failure_action)
+
+ if isinstance(topic, str):
+ if is_pattern_topic:
+ self._client.subscribe_async_pattern(
+ topic, subscription_name, conf,
+ functools.partial(_set_future, future)
+ )
+ else:
+ self._client.subscribe_async(
+ topic, subscription_name, conf,
+ functools.partial(_set_future, future)
+ )
+ elif isinstance(topic, list):
+ if is_pattern_topic:
+ raise ValueError(
+ "Argument 'topic' must be a string when "
+ "'is_pattern_topic' is True; lists of topics do not "
+ "support pattern subscriptions"
+ )
+ self._client.subscribe_async_topics(
+ topic, subscription_name, conf,
+ functools.partial(_set_future, future)
+ )
+ else:
+ raise ValueError(
+ "Argument 'topic' is expected to be of a type between "
+ "(str, list)"
Review Comment:
The error message uses "between (str, list)" which is grammatically
incorrect. The correct phrasing should be "of type str or list" or "one of
(str, list)". The phrase "between" is not appropriate when listing types.
```suggestion
"Argument 'topic' is expected to be of type 'str' or 'list'"
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]