Copilot commented on code in PR #277:
URL: 
https://github.com/apache/pulsar-client-python/pull/277#discussion_r2643288652


##########
pulsar/asyncio.py:
##########
@@ -116,6 +128,177 @@ async def close(self) -> None:
         self._producer.close_async(functools.partial(_set_future, future, 
value=None))
         await future
 
+class Consumer:
+    """
+    The Pulsar message consumer, used to subscribe to messages from a topic.
+    """
+
+    def __init__(self, consumer: _pulsar.Consumer, schema: 
pulsar.schema.Schema) -> None:
+        """
+        Create the consumer.
+        Users should not call this constructor directly. Instead, create the
+        consumer via `Client.subscribe`.
+
+        Parameters
+        ----------
+        consumer: _pulsar.Consumer
+            The underlying Consumer object from the C extension.
+        schema: pulsar.schema.Schema
+            The schema of the data that will be received by this consumer.
+        """
+        self._consumer = consumer
+        self._schema = schema
+
+    async def receive(self) -> pulsar.Message:
+        """
+        Receive a single message asynchronously.
+
+        Returns
+        -------
+        pulsar.Message
+            The message received.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.receive_async(functools.partial(_set_future, future))
+        msg = await future
+        m = pulsar.Message()
+        m._message = msg
+        m._schema = self._schema
+        return m
+
+    async def acknowledge(
+        self,
+        message: Union[pulsar.Message, pulsar.MessageId,
+                       _pulsar.Message, _pulsar.MessageId]
+    ) -> None:
+        """
+        Acknowledge the reception of a single message asynchronously.
+
+        Parameters
+        ----------
+        message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+            The received message or message id.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(message, pulsar.Message):
+            msg = message._message
+        elif isinstance(message, pulsar.MessageId):
+            msg = message._msg_id
+        else:
+            msg = message
+        self._consumer.acknowledge_async(msg, functools.partial(_set_future, 
future, value=None))
+        await future
+
+    async def acknowledge_cumulative(
+        self,
+        message: Union[pulsar.Message, pulsar.MessageId,
+                       _pulsar.Message, _pulsar.MessageId]
+    ) -> None:
+        """
+        Acknowledge the reception of all the messages in the stream up to (and
+        including) the provided message asynchronously.
+
+        Parameters
+        ----------
+        message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+            The received message or message id.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(message, pulsar.Message):
+            msg = message._message
+        elif isinstance(message, pulsar.MessageId):
+            msg = message._msg_id
+        else:
+            msg = message
+        self._consumer.acknowledge_cumulative_async(
+            msg, functools.partial(_set_future, future, value=None)
+        )
+        await future
+
+    async def unsubscribe(self) -> None:
+        """
+        Unsubscribe the current consumer from the topic asynchronously.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.unsubscribe_async(functools.partial(_set_future, 
future, value=None))
+        await future
+
+    async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None:
+        """
+        Reset the subscription associated with this consumer to a specific
+        message id or publish timestamp asynchronously.
+
+        The message id can either be a specific message or represent the first
+        or last messages in the topic.
+
+        Parameters
+        ----------
+        messageid : MessageId or int
+            The message id for seek, OR an integer event time (timestamp) to
+            seek to
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(messageid, pulsar.MessageId):
+            msg_id = messageid._msg_id
+        elif isinstance(messageid, int):
+            msg_id = messageid
+        else:
+            raise ValueError(f"invalid messageid type {type(messageid)}")
+        self._consumer.seek_async(
+            msg_id, functools.partial(_set_future, future, value=None)
+        )
+        await future
+
+    async def close(self) -> None:
+        """
+        Close the consumer asynchronously.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.close_async(functools.partial(_set_future, future, 
value=None))
+        await future
+
+    def topic(self) -> str:
+        """
+        Return the topic this consumer is subscribed to.
+        """
+        return self._consumer.topic()
+
+    def subscription_name(self) -> str:
+        """
+        Return the subscription name.
+        """
+        return self._consumer.subscription_name()
+
+    def consumer_name(self) -> str:
+        """
+        Return the consumer name.
+        """
+        return self._consumer.consumer_name()

Review Comment:
   The async Consumer class is missing negative acknowledgement functionality. 
The synchronous Consumer has `negative_acknowledge` methods (lines 171-172 in 
consumer.cc), but there's no async equivalent exposed. This means async 
consumers cannot use negative acknowledgements to trigger message redelivery, 
which is an important feature available in the synchronous API. Consider adding 
`negative_acknowledge_async` methods in consumer.cc and exposing them through 
the Python async Consumer class to maintain feature parity.



##########
pulsar/asyncio.py:
##########
@@ -145,12 +407,238 @@ async def create_producer(self, topic: str) -> Producer:
         ------
         PulsarException
         """
+        if schema is None:
+            schema = pulsar.schema.BytesSchema()
+
         future = asyncio.get_running_loop().create_future()
         conf = _pulsar.ProducerConfiguration()
-        # TODO: add more configs
-        self._client.create_producer_async(topic, conf, 
functools.partial(_set_future, future))
+        conf.send_timeout_millis(send_timeout_millis)
+        conf.compression_type(compression_type)
+        conf.max_pending_messages(max_pending_messages)
+        
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+        conf.block_if_queue_full(block_if_queue_full)
+        conf.batching_enabled(batching_enabled)
+        conf.batching_max_messages(batching_max_messages)
+        
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+        conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+        conf.partitions_routing_mode(message_routing_mode)
+        conf.batching_type(batching_type)
+        conf.chunking_enabled(chunking_enabled)
+        conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+        conf.access_mode(access_mode)
+        if message_router is not None:
+            def underlying_router(msg, num_partitions):
+                return int(message_router(pulsar.Message._wrap(msg),
+                                          num_partitions))
+            conf.message_router(underlying_router)
+
+        if producer_name:
+            conf.producer_name(producer_name)
+        if initial_sequence_id is not None:
+            conf.initial_sequence_id(initial_sequence_id)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+
+        conf.schema(schema.schema_info())
+        if encryption_key:
+            conf.encryption_key(encryption_key)
+        if crypto_key_reader:
+            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+        if batching_enabled and chunking_enabled:
+            raise ValueError(
+                "Batching and chunking of messages can't be enabled together."
+            )
+
+        self._client.create_producer_async(
+            topic, conf, functools.partial(_set_future, future)
+        )
         return Producer(await future)

Review Comment:
   The Producer is instantiated without passing the schema. The Producer needs 
to know which schema to use for encoding messages in the `send()` method. 
Modify the instantiation to pass the schema: `return Producer(await future, 
schema)` and update the Producer's `__init__` to accept and store the schema 
parameter.



##########
pulsar/asyncio.py:
##########
@@ -116,6 +128,177 @@ async def close(self) -> None:
         self._producer.close_async(functools.partial(_set_future, future, 
value=None))
         await future
 
+class Consumer:
+    """
+    The Pulsar message consumer, used to subscribe to messages from a topic.
+    """
+
+    def __init__(self, consumer: _pulsar.Consumer, schema: 
pulsar.schema.Schema) -> None:
+        """
+        Create the consumer.
+        Users should not call this constructor directly. Instead, create the
+        consumer via `Client.subscribe`.
+
+        Parameters
+        ----------
+        consumer: _pulsar.Consumer
+            The underlying Consumer object from the C extension.
+        schema: pulsar.schema.Schema
+            The schema of the data that will be received by this consumer.
+        """
+        self._consumer = consumer
+        self._schema = schema
+
+    async def receive(self) -> pulsar.Message:
+        """
+        Receive a single message asynchronously.
+
+        Returns
+        -------
+        pulsar.Message
+            The message received.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.receive_async(functools.partial(_set_future, future))
+        msg = await future
+        m = pulsar.Message()
+        m._message = msg
+        m._schema = self._schema
+        return m
+
+    async def acknowledge(
+        self,
+        message: Union[pulsar.Message, pulsar.MessageId,
+                       _pulsar.Message, _pulsar.MessageId]
+    ) -> None:
+        """
+        Acknowledge the reception of a single message asynchronously.
+
+        Parameters
+        ----------
+        message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+            The received message or message id.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(message, pulsar.Message):
+            msg = message._message
+        elif isinstance(message, pulsar.MessageId):
+            msg = message._msg_id
+        else:
+            msg = message
+        self._consumer.acknowledge_async(msg, functools.partial(_set_future, 
future, value=None))
+        await future
+
+    async def acknowledge_cumulative(
+        self,
+        message: Union[pulsar.Message, pulsar.MessageId,
+                       _pulsar.Message, _pulsar.MessageId]
+    ) -> None:
+        """
+        Acknowledge the reception of all the messages in the stream up to (and
+        including) the provided message asynchronously.
+
+        Parameters
+        ----------
+        message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+            The received message or message id.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(message, pulsar.Message):
+            msg = message._message
+        elif isinstance(message, pulsar.MessageId):
+            msg = message._msg_id
+        else:
+            msg = message
+        self._consumer.acknowledge_cumulative_async(
+            msg, functools.partial(_set_future, future, value=None)
+        )
+        await future
+
+    async def unsubscribe(self) -> None:
+        """
+        Unsubscribe the current consumer from the topic asynchronously.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.unsubscribe_async(functools.partial(_set_future, 
future, value=None))
+        await future
+
+    async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None:
+        """
+        Reset the subscription associated with this consumer to a specific
+        message id or publish timestamp asynchronously.
+
+        The message id can either be a specific message or represent the first
+        or last messages in the topic.
+
+        Parameters
+        ----------
+        messageid : MessageId or int
+            The message id for seek, OR an integer event time (timestamp) to
+            seek to
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(messageid, pulsar.MessageId):
+            msg_id = messageid._msg_id
+        elif isinstance(messageid, int):
+            msg_id = messageid
+        else:
+            raise ValueError(f"invalid messageid type {type(messageid)}")
+        self._consumer.seek_async(
+            msg_id, functools.partial(_set_future, future, value=None)
+        )
+        await future
+
+    async def close(self) -> None:
+        """
+        Close the consumer asynchronously.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.close_async(functools.partial(_set_future, future, 
value=None))
+        await future
+
+    def topic(self) -> str:
+        """
+        Return the topic this consumer is subscribed to.
+        """
+        return self._consumer.topic()
+
+    def subscription_name(self) -> str:
+        """
+        Return the subscription name.
+        """
+        return self._consumer.subscription_name()
+
+    def consumer_name(self) -> str:
+        """
+        Return the consumer name.
+        """
+        return self._consumer.consumer_name()
+

Review Comment:
   The async Consumer class is missing several utility methods available in the 
synchronous Consumer:
   1. `is_connected()` - to check connection status
   2. `get_last_message_id()` - to get the last message ID
   3. `redeliver_unacknowledged_messages()` - to trigger redelivery of unacked 
messages
   
   These are important utility methods. Consider adding them to provide feature 
parity with the synchronous API.
   ```suggestion
   
       def is_connected(self) -> bool:
           """
           Return whether the consumer is currently connected to the broker.
           """
           return self._consumer.is_connected()
   
       def get_last_message_id(self) -> pulsar.MessageId:
           """
           Get the last message id available for this topic.
   
           Returns
           -------
           MessageId
               The last message id available for this topic.
           """
           return pulsar.MessageId(self._consumer.get_last_message_id())
   
       def redeliver_unacknowledged_messages(
           self, message_ids: List[pulsar.MessageId] | None = None
       ) -> None:
           """
           Redeliver one or all unacknowledged messages.
   
           If `message_ids` is None, all unacknowledged messages are marked for
           redelivery. Otherwise, only the specified messages are redelivered.
   
           Parameters
           ----------
           message_ids : list[MessageId], optional
               The specific messages to redeliver.
           """
           if message_ids is None:
               self._consumer.redeliver_unacknowledged_messages()
           else:
               self._consumer.redeliver_unacknowledged_messages(
                   [msg._msg_id for msg in message_ids]
               )
   ```



##########
tests/asyncio_test.py:
##########
@@ -58,29 +71,199 @@ async def test_batch_send(self):
             self.assertEqual(msg_ids[i].entry_id(), entry_id)
             self.assertEqual(msg_ids[i].batch_index(), i)
 
+        consumer = await self._client.subscribe(topic, 'sub',
+                                                
initial_position=pulsar.InitialPosition.Earliest)
+        for i in range(5):
+            msg = await consumer.receive()
+            self.assertEqual(msg.data(), f'msg-{i}'.encode())
+        await consumer.close()
+
+        # create a different subscription to verify initial position is latest 
by default
+        consumer = await self._client.subscribe(topic, 'sub2')
+        await producer.send(b'final-message')
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'final-message')
+
     async def test_create_producer_failure(self):
         try:
-            await 
self._client.create_producer('tenant/ns/awaitio-test-send-failure')
+            await 
self._client.create_producer('tenant/ns/asyncio-test-send-failure')
             self.fail()
         except PulsarException as e:
             self.assertEqual(e.error(), pulsar.Result.Timeout)
 
     async def test_send_failure(self):
-        producer = await 
self._client.create_producer('awaitio-test-send-failure')
+        producer = await 
self._client.create_producer('asyncio-test-send-failure')
         try:
             await producer.send(('x' * 1024 * 1024 * 10).encode())
             self.fail()
         except PulsarException as e:
             self.assertEqual(e.error(), pulsar.Result.MessageTooBig)
 
     async def test_close_producer(self):
-        producer = await 
self._client.create_producer('awaitio-test-close-producer')
+        producer = await 
self._client.create_producer('asyncio-test-close-producer')
         await producer.close()
         try:
             await producer.close()
             self.fail()
         except PulsarException as e:
             self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
 
+    async def _prepare_messages(self, producer: Producer) -> 
List[pulsar.MessageId]:
+        msg_ids = []
+        for i in range(5):
+            msg_ids.append(await producer.send(f'msg-{i}'.encode()))
+        return msg_ids
+
+    async def test_consumer_cumulative_acknowledge(self):
+        topic = f'asyncio-test-consumer-cumulative-ack-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(topic, sub)
+        producer = await self._client.create_producer(topic)
+        await self._prepare_messages(producer)
+        last_msg = None
+        for _ in range(5):
+            last_msg = await consumer.receive()
+        await consumer.acknowledge_cumulative(last_msg)
+        await consumer.close()
+
+        consumer = await self._client.subscribe(topic, sub)
+        await producer.send(b'final-message')
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'final-message')
+
+    async def test_consumer_individual_acknowledge(self):
+        topic = f'asyncio-test-consumer-individual-ack-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(topic, sub,
+                                                
consumer_type=pulsar.ConsumerType.Shared)
+        producer = await self._client.create_producer(topic)
+        await self._prepare_messages(producer)
+        msgs = []
+        for _ in range(5):
+            msg = await consumer.receive()
+            msgs.append(msg)
+
+        await consumer.acknowledge(msgs[0])
+        await consumer.acknowledge(msgs[2])
+        await consumer.acknowledge(msgs[4])
+        await consumer.close()
+
+        consumer = await self._client.subscribe(topic, sub,
+                                                
consumer_type=pulsar.ConsumerType.Shared)
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'msg-1')
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'msg-3')
+
+    async def test_multi_topic_consumer(self):
+        topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2']
+        producers = []
+
+        for topic in topics:
+            producer = await self._client.create_producer(topic)
+            producers.append(producer)
+
+        consumer = await self._client.subscribe(topics, 
'test-multi-subscription')
+
+        await producers[0].send(b'message-from-topic-1')
+        await producers[1].send(b'message-from-topic-2')
+
+        async def verify_receive(consumer: Consumer):
+            received_messages = {}
+            for _ in range(2):
+                msg = await consumer.receive()
+                received_messages[msg.data()] = None
+                await consumer.acknowledge(msg.message_id())
+            self.assertEqual(received_messages, {
+                b'message-from-topic-1': None,
+                b'message-from-topic-2': None
+            })
+
+        await verify_receive(consumer)
+        await consumer.close()
+
+        consumer = await 
self._client.subscribe('public/default/asyncio-test-multi-topic-.*',
+                                                'test-multi-subscription-2',
+                                                is_pattern_topic=True,
+                                                
initial_position=pulsar.InitialPosition.Earliest)

Review Comment:
   The test relies on previously sent messages being available when creating a 
pattern subscription consumer. This could be flaky if messages are expired or 
if there's a timing issue. The pattern subscription at line 185 uses 
`initial_position=pulsar.InitialPosition.Earliest` which is correct, but for 
robustness, consider sending fresh messages after creating the pattern consumer 
to verify it works correctly, rather than relying on messages sent to the first 
consumer.
   ```suggestion
                                                   
initial_position=pulsar.InitialPosition.Earliest)
           await producers[0].send(b'message-from-topic-1')
           await producers[1].send(b'message-from-topic-2')
   ```



##########
pulsar/asyncio.py:
##########
@@ -116,6 +128,177 @@ async def close(self) -> None:
         self._producer.close_async(functools.partial(_set_future, future, 
value=None))
         await future
 
+class Consumer:
+    """
+    The Pulsar message consumer, used to subscribe to messages from a topic.
+    """
+
+    def __init__(self, consumer: _pulsar.Consumer, schema: 
pulsar.schema.Schema) -> None:
+        """
+        Create the consumer.
+        Users should not call this constructor directly. Instead, create the
+        consumer via `Client.subscribe`.
+
+        Parameters
+        ----------
+        consumer: _pulsar.Consumer
+            The underlying Consumer object from the C extension.
+        schema: pulsar.schema.Schema
+            The schema of the data that will be received by this consumer.
+        """
+        self._consumer = consumer
+        self._schema = schema
+
+    async def receive(self) -> pulsar.Message:
+        """
+        Receive a single message asynchronously.
+
+        Returns
+        -------
+        pulsar.Message
+            The message received.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.receive_async(functools.partial(_set_future, future))
+        msg = await future
+        m = pulsar.Message()
+        m._message = msg
+        m._schema = self._schema
+        return m
+
+    async def acknowledge(
+        self,
+        message: Union[pulsar.Message, pulsar.MessageId,
+                       _pulsar.Message, _pulsar.MessageId]
+    ) -> None:
+        """
+        Acknowledge the reception of a single message asynchronously.
+
+        Parameters
+        ----------
+        message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+            The received message or message id.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(message, pulsar.Message):
+            msg = message._message
+        elif isinstance(message, pulsar.MessageId):
+            msg = message._msg_id
+        else:
+            msg = message
+        self._consumer.acknowledge_async(msg, functools.partial(_set_future, 
future, value=None))
+        await future
+
+    async def acknowledge_cumulative(
+        self,
+        message: Union[pulsar.Message, pulsar.MessageId,
+                       _pulsar.Message, _pulsar.MessageId]
+    ) -> None:
+        """
+        Acknowledge the reception of all the messages in the stream up to (and
+        including) the provided message asynchronously.
+
+        Parameters
+        ----------
+        message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+            The received message or message id.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(message, pulsar.Message):
+            msg = message._message
+        elif isinstance(message, pulsar.MessageId):
+            msg = message._msg_id
+        else:
+            msg = message
+        self._consumer.acknowledge_cumulative_async(
+            msg, functools.partial(_set_future, future, value=None)
+        )
+        await future
+
+    async def unsubscribe(self) -> None:
+        """
+        Unsubscribe the current consumer from the topic asynchronously.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.unsubscribe_async(functools.partial(_set_future, 
future, value=None))
+        await future
+
+    async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None:
+        """
+        Reset the subscription associated with this consumer to a specific
+        message id or publish timestamp asynchronously.
+
+        The message id can either be a specific message or represent the first
+        or last messages in the topic.
+
+        Parameters
+        ----------
+        messageid : MessageId or int
+            The message id for seek, OR an integer event time (timestamp) to
+            seek to
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(messageid, pulsar.MessageId):
+            msg_id = messageid._msg_id
+        elif isinstance(messageid, int):
+            msg_id = messageid
+        else:
+            raise ValueError(f"invalid messageid type {type(messageid)}")
+        self._consumer.seek_async(
+            msg_id, functools.partial(_set_future, future, value=None)
+        )
+        await future
+
+    async def close(self) -> None:
+        """
+        Close the consumer asynchronously.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.close_async(functools.partial(_set_future, future, 
value=None))
+        await future
+
+    def topic(self) -> str:
+        """
+        Return the topic this consumer is subscribed to.
+        """
+        return self._consumer.topic()
+
+    def subscription_name(self) -> str:
+        """
+        Return the subscription name.
+        """
+        return self._consumer.subscription_name()
+
+    def consumer_name(self) -> str:
+        """
+        Return the consumer name.
+        """
+        return self._consumer.consumer_name()

Review Comment:
   The new async consumer functionality lacks test coverage for several 
important features that are tested in the synchronous version:
   1. Error handling for acknowledge operations (e.g., acknowledging already 
acknowledged messages)
   2. Negative acknowledgement functionality
   3. Dead letter policy behavior
   4. Batch receive operations
   5. Consumer with key-shared policy
   6. Error conditions when receiving messages
   
   Consider adding test cases for these scenarios to ensure the async consumer 
behaves consistently with the synchronous version.



##########
pulsar/asyncio.py:
##########
@@ -116,6 +128,177 @@ async def close(self) -> None:
         self._producer.close_async(functools.partial(_set_future, future, 
value=None))
         await future
 

Review Comment:
   The async Producer class is missing several methods that are available in 
the synchronous Producer class:
   1. `flush()` - to flush buffered messages
   2. `producer_name()` - to get the producer name
   3. `topic()` - to get the topic name
   4. `last_sequence_id()` - to get the last sequence ID
   5. `is_connected()` - to check connection status
   
   These methods are non-blocking getters or have synchronous implementations 
in the C++ library. Consider adding them to maintain API parity with the 
synchronous Producer, especially `flush()` which is important for ensuring 
message delivery before closing.
   ```suggestion
   
       async def flush(self) -> None:
           """
           Flush all buffered messages.
   
           This method ensures that all messages previously passed to ``send`` 
are
           flushed to the broker before it returns.
   
           Raises
           ------
           PulsarException
           """
           # The underlying C++ flush implementation is synchronous and 
non-blocking,
           # so it is safe to call it directly from this async wrapper.
           self._producer.flush()
   
       def producer_name(self) -> str:
           """
           Get the name of the producer.
   
           Returns
           -------
           str
               The producer name.
           """
           return self._producer.producer_name()
   
       def topic(self) -> str:
           """
           Get the topic this producer is publishing to.
   
           Returns
           -------
           str
               The topic name.
           """
           return self._producer.topic()
   
       def last_sequence_id(self) -> int:
           """
           Get the last sequence id that was published by this producer.
   
           Returns
           -------
           int
               The last published sequence id.
           """
           return self._producer.last_sequence_id()
   
       def is_connected(self) -> bool:
           """
           Check whether the producer is currently connected to the broker.
   
           Returns
           -------
           bool
               True if the producer is connected, False otherwise.
           """
           return self._producer.is_connected()
   ```



##########
pulsar/asyncio.py:
##########
@@ -116,6 +128,177 @@ async def close(self) -> None:
         self._producer.close_async(functools.partial(_set_future, future, 
value=None))
         await future
 
+class Consumer:
+    """
+    The Pulsar message consumer, used to subscribe to messages from a topic.
+    """
+
+    def __init__(self, consumer: _pulsar.Consumer, schema: 
pulsar.schema.Schema) -> None:
+        """
+        Create the consumer.
+        Users should not call this constructor directly. Instead, create the
+        consumer via `Client.subscribe`.
+
+        Parameters
+        ----------
+        consumer: _pulsar.Consumer
+            The underlying Consumer object from the C extension.
+        schema: pulsar.schema.Schema
+            The schema of the data that will be received by this consumer.
+        """
+        self._consumer = consumer
+        self._schema = schema
+
+    async def receive(self) -> pulsar.Message:
+        """
+        Receive a single message asynchronously.
+
+        Returns
+        -------
+        pulsar.Message
+            The message received.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.receive_async(functools.partial(_set_future, future))
+        msg = await future
+        m = pulsar.Message()
+        m._message = msg
+        m._schema = self._schema
+        return m
+
+    async def acknowledge(
+        self,
+        message: Union[pulsar.Message, pulsar.MessageId,
+                       _pulsar.Message, _pulsar.MessageId]
+    ) -> None:
+        """
+        Acknowledge the reception of a single message asynchronously.
+
+        Parameters
+        ----------
+        message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+            The received message or message id.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(message, pulsar.Message):
+            msg = message._message
+        elif isinstance(message, pulsar.MessageId):
+            msg = message._msg_id
+        else:
+            msg = message
+        self._consumer.acknowledge_async(msg, functools.partial(_set_future, 
future, value=None))
+        await future
+
+    async def acknowledge_cumulative(
+        self,
+        message: Union[pulsar.Message, pulsar.MessageId,
+                       _pulsar.Message, _pulsar.MessageId]
+    ) -> None:
+        """
+        Acknowledge the reception of all the messages in the stream up to (and
+        including) the provided message asynchronously.
+
+        Parameters
+        ----------
+        message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
+            The received message or message id.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(message, pulsar.Message):
+            msg = message._message
+        elif isinstance(message, pulsar.MessageId):
+            msg = message._msg_id
+        else:
+            msg = message
+        self._consumer.acknowledge_cumulative_async(
+            msg, functools.partial(_set_future, future, value=None)
+        )
+        await future
+
+    async def unsubscribe(self) -> None:
+        """
+        Unsubscribe the current consumer from the topic asynchronously.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.unsubscribe_async(functools.partial(_set_future, 
future, value=None))
+        await future
+
+    async def seek(self, messageid: Union[pulsar.MessageId, int]) -> None:
+        """
+        Reset the subscription associated with this consumer to a specific
+        message id or publish timestamp asynchronously.
+
+        The message id can either be a specific message or represent the first
+        or last messages in the topic.
+
+        Parameters
+        ----------
+        messageid : MessageId or int
+            The message id for seek, OR an integer event time (timestamp) to
+            seek to
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        if isinstance(messageid, pulsar.MessageId):
+            msg_id = messageid._msg_id
+        elif isinstance(messageid, int):
+            msg_id = messageid
+        else:
+            raise ValueError(f"invalid messageid type {type(messageid)}")
+        self._consumer.seek_async(
+            msg_id, functools.partial(_set_future, future, value=None)
+        )
+        await future
+
+    async def close(self) -> None:
+        """
+        Close the consumer asynchronously.
+
+        Raises
+        ------
+        PulsarException
+        """
+        future = asyncio.get_running_loop().create_future()
+        self._consumer.close_async(functools.partial(_set_future, future, 
value=None))
+        await future
+
+    def topic(self) -> str:
+        """
+        Return the topic this consumer is subscribed to.
+        """
+        return self._consumer.topic()
+
+    def subscription_name(self) -> str:
+        """
+        Return the subscription name.
+        """
+        return self._consumer.subscription_name()
+
+    def consumer_name(self) -> str:
+        """
+        Return the consumer name.
+        """
+        return self._consumer.consumer_name()

Review Comment:
   The async Consumer class is missing the `batch_receive()` method that is 
available in the synchronous Consumer. Batch receiving is an important feature 
for high-throughput scenarios where applications want to receive multiple 
messages at once to reduce processing overhead. Consider adding an async 
`batch_receive()` method to maintain feature parity with the synchronous API.



##########
pulsar/asyncio.py:
##########
@@ -145,12 +407,238 @@ async def create_producer(self, topic: str) -> Producer:
         ------
         PulsarException
         """
+        if schema is None:
+            schema = pulsar.schema.BytesSchema()
+
         future = asyncio.get_running_loop().create_future()
         conf = _pulsar.ProducerConfiguration()
-        # TODO: add more configs
-        self._client.create_producer_async(topic, conf, 
functools.partial(_set_future, future))
+        conf.send_timeout_millis(send_timeout_millis)
+        conf.compression_type(compression_type)
+        conf.max_pending_messages(max_pending_messages)
+        
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+        conf.block_if_queue_full(block_if_queue_full)
+        conf.batching_enabled(batching_enabled)
+        conf.batching_max_messages(batching_max_messages)
+        
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+        conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+        conf.partitions_routing_mode(message_routing_mode)
+        conf.batching_type(batching_type)
+        conf.chunking_enabled(chunking_enabled)
+        conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+        conf.access_mode(access_mode)
+        if message_router is not None:
+            def underlying_router(msg, num_partitions):
+                return int(message_router(pulsar.Message._wrap(msg),
+                                          num_partitions))
+            conf.message_router(underlying_router)
+
+        if producer_name:
+            conf.producer_name(producer_name)
+        if initial_sequence_id is not None:
+            conf.initial_sequence_id(initial_sequence_id)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+
+        conf.schema(schema.schema_info())
+        if encryption_key:
+            conf.encryption_key(encryption_key)
+        if crypto_key_reader:
+            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+        if batching_enabled and chunking_enabled:
+            raise ValueError(
+                "Batching and chunking of messages can't be enabled together."
+            )

Review Comment:
   The validation for batching and chunking conflict (lines 449-452) occurs 
after the configuration has already been set. This means that if both are 
enabled, the configuration object will be partially configured before the 
ValueError is raised. Consider moving this validation to the beginning of the 
method, right after the schema initialization, to fail fast before any 
configuration is applied.



##########
pulsar/asyncio.py:
##########
@@ -145,12 +407,238 @@ async def create_producer(self, topic: str) -> Producer:
         ------
         PulsarException
         """
+        if schema is None:
+            schema = pulsar.schema.BytesSchema()
+
         future = asyncio.get_running_loop().create_future()
         conf = _pulsar.ProducerConfiguration()
-        # TODO: add more configs
-        self._client.create_producer_async(topic, conf, 
functools.partial(_set_future, future))
+        conf.send_timeout_millis(send_timeout_millis)
+        conf.compression_type(compression_type)
+        conf.max_pending_messages(max_pending_messages)
+        
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+        conf.block_if_queue_full(block_if_queue_full)
+        conf.batching_enabled(batching_enabled)
+        conf.batching_max_messages(batching_max_messages)
+        
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+        conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+        conf.partitions_routing_mode(message_routing_mode)
+        conf.batching_type(batching_type)
+        conf.chunking_enabled(chunking_enabled)
+        conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+        conf.access_mode(access_mode)
+        if message_router is not None:
+            def underlying_router(msg, num_partitions):
+                return int(message_router(pulsar.Message._wrap(msg),
+                                          num_partitions))
+            conf.message_router(underlying_router)
+
+        if producer_name:
+            conf.producer_name(producer_name)
+        if initial_sequence_id is not None:
+            conf.initial_sequence_id(initial_sequence_id)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+
+        conf.schema(schema.schema_info())
+        if encryption_key:
+            conf.encryption_key(encryption_key)
+        if crypto_key_reader:
+            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+        if batching_enabled and chunking_enabled:
+            raise ValueError(
+                "Batching and chunking of messages can't be enabled together."
+            )
+
+        self._client.create_producer_async(
+            topic, conf, functools.partial(_set_future, future)
+        )
         return Producer(await future)
 
+    # pylint: 
disable=too-many-arguments,too-many-locals,too-many-branches,too-many-positional-arguments
+    async def subscribe(self, topic: Union[str, List[str]],
+                        subscription_name: str,
+                        consumer_type: pulsar.ConsumerType =
+                        pulsar.ConsumerType.Exclusive,
+                        schema: pulsar.schema.Schema | None = None,
+                        receiver_queue_size: int = 1000,
+                        max_total_receiver_queue_size_across_partitions: int =
+                        50000,
+                        consumer_name: str | None = None,
+                        unacked_messages_timeout_ms: int | None = None,
+                        broker_consumer_stats_cache_time_ms: int = 30000,
+                        negative_ack_redelivery_delay_ms: int = 60000,
+                        is_read_compacted: bool = False,
+                        properties: dict | None = None,
+                        initial_position: InitialPosition = 
InitialPosition.Latest,
+                        crypto_key_reader: pulsar.CryptoKeyReader | None = 
None,
+                        replicate_subscription_state_enabled: bool = False,
+                        max_pending_chunked_message: int = 10,
+                        auto_ack_oldest_chunked_message_on_queue_full: bool = 
False,
+                        start_message_id_inclusive: bool = False,
+                        batch_receive_policy: 
pulsar.ConsumerBatchReceivePolicy | None =
+                        None,
+                        key_shared_policy: pulsar.ConsumerKeySharedPolicy | 
None =
+                        None,
+                        batch_index_ack_enabled: bool = False,
+                        regex_subscription_mode: RegexSubscriptionMode =
+                        RegexSubscriptionMode.PersistentOnly,
+                        dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | 
None =
+                        None,
+                        crypto_failure_action: ConsumerCryptoFailureAction =
+                        ConsumerCryptoFailureAction.FAIL,
+                        is_pattern_topic: bool = False) -> Consumer:
+        """
+        Subscribe to the given topic and subscription combination.
+
+        Parameters
+        ----------
+        topic: str, List[str], or regex pattern
+            The name of the topic, list of topics or regex pattern.
+            When `is_pattern_topic` is True, `topic` is treated as a regex.
+        subscription_name: str
+            The name of the subscription.
+        consumer_type: pulsar.ConsumerType, 
default=pulsar.ConsumerType.Exclusive
+            Select the subscription type to be used when subscribing to the 
topic.
+        schema: pulsar.schema.Schema | None, default=None
+            Define the schema of the data that will be received by this 
consumer.
+        receiver_queue_size: int, default=1000
+            Sets the size of the consumer receive queue.
+        max_total_receiver_queue_size_across_partitions: int, default=50000
+            Set the max total receiver queue size across partitions.
+        consumer_name: str | None, default=None
+            Sets the consumer name.
+        unacked_messages_timeout_ms: int | None, default=None
+            Sets the timeout in milliseconds for unacknowledged messages.
+        broker_consumer_stats_cache_time_ms: int, default=30000
+            Sets the time duration for which the broker-side consumer stats
+            will be cached in the client.
+        negative_ack_redelivery_delay_ms: int, default=60000
+            The delay after which to redeliver the messages that failed to be
+            processed.
+        is_read_compacted: bool, default=False
+            Selects whether to read the compacted version of the topic.
+        properties: dict | None, default=None
+            Sets the properties for the consumer.
+        initial_position: InitialPosition, default=InitialPosition.Latest
+            Set the initial position of a consumer when subscribing to the 
topic.
+        crypto_key_reader: pulsar.CryptoKeyReader | None, default=None
+            Symmetric encryption class implementation.
+        replicate_subscription_state_enabled: bool, default=False
+            Set whether the subscription status should be replicated.
+        max_pending_chunked_message: int, default=10
+            Consumer buffers chunk messages into memory until it receives all 
the chunks.
+        auto_ack_oldest_chunked_message_on_queue_full: bool, default=False
+            Automatically acknowledge oldest chunked messages on queue
+            full.
+        start_message_id_inclusive: bool, default=False
+            Set the consumer to include the given position of any reset
+            operation.
+        batch_receive_policy: pulsar.ConsumerBatchReceivePolicy | None, 
default=None
+            Set the batch collection policy for batch receiving.
+        key_shared_policy: pulsar.ConsumerKeySharedPolicy | None, default=None
+            Set the key shared policy for use when the ConsumerType is
+            KeyShared.
+        batch_index_ack_enabled: bool, default=False
+            Enable the batch index acknowledgement.
+        regex_subscription_mode: RegexSubscriptionMode,
+            default=RegexSubscriptionMode.PersistentOnly
+            Set the regex subscription mode for use when the topic is a regex
+            pattern.
+        dead_letter_policy: pulsar.ConsumerDeadLetterPolicy | None, 
default=None
+            Set dead letter policy for consumer.
+        crypto_failure_action: ConsumerCryptoFailureAction,
+            default=ConsumerCryptoFailureAction.FAIL
+            Set the behavior when the decryption fails.
+        is_pattern_topic: bool, default=False
+            Whether `topic` is a regex pattern. If it's True when `topic` is a 
list, a ValueError
+            will be raised.
+
+        Returns
+        -------
+        Consumer
+            The consumer created
+
+        Raises
+        ------
+        PulsarException
+        """
+        if schema is None:
+            schema = pulsar.schema.BytesSchema()
+
+        future = asyncio.get_running_loop().create_future()
+        conf = _pulsar.ConsumerConfiguration()
+        conf.consumer_type(consumer_type)
+        conf.regex_subscription_mode(regex_subscription_mode)
+        conf.read_compacted(is_read_compacted)
+        conf.receiver_queue_size(receiver_queue_size)
+        conf.max_total_receiver_queue_size_across_partitions(
+            max_total_receiver_queue_size_across_partitions
+        )
+        if consumer_name:
+            conf.consumer_name(consumer_name)
+        if unacked_messages_timeout_ms:
+            conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
+
+        conf.negative_ack_redelivery_delay_ms(negative_ack_redelivery_delay_ms)
+        
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+        conf.subscription_initial_position(initial_position)
+
+        conf.schema(schema.schema_info())
+
+        if crypto_key_reader:
+            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+        
conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)
+        conf.max_pending_chunked_message(max_pending_chunked_message)
+        conf.auto_ack_oldest_chunked_message_on_queue_full(
+            auto_ack_oldest_chunked_message_on_queue_full
+        )
+        conf.start_message_id_inclusive(start_message_id_inclusive)
+        if batch_receive_policy:
+            conf.batch_receive_policy(batch_receive_policy.policy())
+
+        if key_shared_policy:
+            conf.key_shared_policy(key_shared_policy.policy())
+        conf.batch_index_ack_enabled(batch_index_ack_enabled)
+        if dead_letter_policy:
+            conf.dead_letter_policy(dead_letter_policy.policy())
+        conf.crypto_failure_action(crypto_failure_action)
+
+        if isinstance(topic, str):
+            if is_pattern_topic:
+                self._client.subscribe_async_pattern(
+                    topic, subscription_name, conf,
+                    functools.partial(_set_future, future)
+                )
+            else:
+                self._client.subscribe_async(
+                    topic, subscription_name, conf,
+                    functools.partial(_set_future, future)
+                )
+        elif isinstance(topic, list):
+            if is_pattern_topic:
+                raise ValueError(
+                    "Argument 'topic' must be a string when "
+                    "'is_pattern_topic' is True; lists of topics do not "
+                    "support pattern subscriptions"
+                )
+            self._client.subscribe_async_topics(
+                topic, subscription_name, conf,
+                functools.partial(_set_future, future)
+            )
+        else:
+            raise ValueError(
+                "Argument 'topic' is expected to be of a type between "
+                "(str, list)"

Review Comment:
   The error message uses "between (str, list)" which is grammatically 
incorrect. The correct phrasing should be "of type str or list" or "one of 
(str, list)". The phrase "between" is not appropriate when listing types.
   ```suggestion
                   "Argument 'topic' is expected to be of type 'str' or 'list'"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to