Copilot commented on code in PR #277:
URL: 
https://github.com/apache/pulsar-client-python/pull/277#discussion_r2643054382


##########
pulsar/asyncio.py:
##########
@@ -145,12 +401,239 @@ async def create_producer(self, topic: str) -> Producer:
         ------
         PulsarException
         """
+        if schema is None:
+            schema = pulsar.schema.BytesSchema()
+
         future = asyncio.get_running_loop().create_future()
         conf = _pulsar.ProducerConfiguration()
-        # TODO: add more configs
-        self._client.create_producer_async(topic, conf, 
functools.partial(_set_future, future))
+        conf.send_timeout_millis(send_timeout_millis)
+        conf.compression_type(compression_type)
+        conf.max_pending_messages(max_pending_messages)
+        
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+        conf.block_if_queue_full(block_if_queue_full)
+        conf.batching_enabled(batching_enabled)
+        conf.batching_max_messages(batching_max_messages)
+        
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+        conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+        conf.partitions_routing_mode(message_routing_mode)
+        conf.batching_type(batching_type)
+        conf.chunking_enabled(chunking_enabled)
+        conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+        conf.access_mode(access_mode)
+        if message_router is not None:
+            def underlying_router(msg, num_partitions):
+                return int(message_router(pulsar.Message._wrap(msg),
+                                          num_partitions))
+            conf.message_router(underlying_router)
+
+        if producer_name:
+            conf.producer_name(producer_name)
+        if initial_sequence_id is not None:
+            conf.initial_sequence_id(initial_sequence_id)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+
+        conf.schema(schema.schema_info())
+        if encryption_key:
+            conf.encryption_key(encryption_key)
+        if crypto_key_reader:
+            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+        if batching_enabled and chunking_enabled:
+            raise ValueError(
+                "Batching and chunking of messages can't be enabled together."
+            )
+
+        self._client.create_producer_async(
+            topic, conf, functools.partial(_set_future, future)
+        )
         return Producer(await future)
 
+    # pylint: disable=too-many-arguments,too-many-locals,too-many-branches
+    async def subscribe(self, topic: Union[str, List[str]],
+                        subscription_name: str,
+                        consumer_type: pulsar.ConsumerType =
+                        pulsar.ConsumerType.Exclusive,
+                        schema: pulsar.schema.Schema = None,
+                        message_listener = None,

Review Comment:
   The message_listener parameter has no type hint, unlike other parameters in 
this function signature. For consistency and better IDE support, it should have 
a type hint (likely Optional[Callable[[Consumer, pulsar.Message], None]]).



##########
pulsar/asyncio.py:
##########
@@ -145,12 +401,239 @@ async def create_producer(self, topic: str) -> Producer:
         ------
         PulsarException
         """
+        if schema is None:
+            schema = pulsar.schema.BytesSchema()
+
         future = asyncio.get_running_loop().create_future()
         conf = _pulsar.ProducerConfiguration()
-        # TODO: add more configs
-        self._client.create_producer_async(topic, conf, 
functools.partial(_set_future, future))
+        conf.send_timeout_millis(send_timeout_millis)
+        conf.compression_type(compression_type)
+        conf.max_pending_messages(max_pending_messages)
+        
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+        conf.block_if_queue_full(block_if_queue_full)
+        conf.batching_enabled(batching_enabled)
+        conf.batching_max_messages(batching_max_messages)
+        
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+        conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+        conf.partitions_routing_mode(message_routing_mode)
+        conf.batching_type(batching_type)
+        conf.chunking_enabled(chunking_enabled)
+        conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+        conf.access_mode(access_mode)
+        if message_router is not None:
+            def underlying_router(msg, num_partitions):
+                return int(message_router(pulsar.Message._wrap(msg),
+                                          num_partitions))
+            conf.message_router(underlying_router)
+
+        if producer_name:
+            conf.producer_name(producer_name)
+        if initial_sequence_id is not None:
+            conf.initial_sequence_id(initial_sequence_id)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+
+        conf.schema(schema.schema_info())
+        if encryption_key:
+            conf.encryption_key(encryption_key)
+        if crypto_key_reader:
+            conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
+
+        if batching_enabled and chunking_enabled:
+            raise ValueError(
+                "Batching and chunking of messages can't be enabled together."
+            )
+
+        self._client.create_producer_async(
+            topic, conf, functools.partial(_set_future, future)
+        )
         return Producer(await future)
 
+    # pylint: disable=too-many-arguments,too-many-locals,too-many-branches
+    async def subscribe(self, topic: Union[str, List[str]],
+                        subscription_name: str,
+                        consumer_type: pulsar.ConsumerType =
+                        pulsar.ConsumerType.Exclusive,
+                        schema: pulsar.schema.Schema = None,
+                        message_listener = None,
+                        receiver_queue_size: int = 1000,
+                        max_total_receiver_queue_size_across_partitions: int =
+                        50000,
+                        consumer_name: str = None,
+                        unacked_messages_timeout_ms: int = None,
+                        broker_consumer_stats_cache_time_ms: int = 30000,
+                        negative_ack_redelivery_delay_ms: int = 60000,
+                        is_read_compacted: bool = False,
+                        properties: dict = None,
+                        pattern_auto_discovery_period: int = 60,  # pylint: 
disable=unused-argument

Review Comment:
   The pattern_auto_discovery_period parameter is accepted but never actually 
configured on the ConsumerConfiguration object. While 
conf.pattern_auto_discovery_period() is available in the C++ bindings, it's not 
called anywhere in this method. This parameter should either be applied to the 
configuration or removed from the function signature. Note: This issue also 
exists in the sync version (pulsar/__init__.py), but should still be addressed 
here for completeness.
   ```suggestion
   
   ```



##########
pulsar/asyncio.py:
##########
@@ -127,14 +306,91 @@ def __init__(self, service_url, **kwargs) -> None:
         """
         self._client: _pulsar.Client = pulsar.Client(service_url, 
**kwargs)._client
 
-    async def create_producer(self, topic: str) -> Producer:
+    # pylint: disable=too-many-arguments,too-many-locals
+    async def create_producer(self, topic: str,
+                              producer_name: str = None,
+                              schema: pulsar.schema.Schema = None,
+                              initial_sequence_id: int = None,
+                              send_timeout_millis: int = 30000,
+                              compression_type: CompressionType = 
CompressionType.NONE,
+                              max_pending_messages: int = 1000,
+                              max_pending_messages_across_partitions: int = 
50000,
+                              block_if_queue_full: bool = False,
+                              batching_enabled: bool = True,
+                              batching_max_messages: int = 1000,
+                              batching_max_allowed_size_in_bytes: int = 
128*1024,
+                              batching_max_publish_delay_ms: int = 10,
+                              chunking_enabled: bool = False,
+                              message_routing_mode: PartitionsRoutingMode =
+                              PartitionsRoutingMode.RoundRobinDistribution,
+                              lazy_start_partitioned_producers: bool = False,
+                              properties: dict = None,
+                              batching_type: BatchingType = 
BatchingType.Default,
+                              encryption_key: str = None,
+                              crypto_key_reader: pulsar.CryptoKeyReader = None,
+                              access_mode: ProducerAccessMode = 
ProducerAccessMode.Shared,
+                              message_router: Callable[[pulsar.Message, int], 
int] = None,
+                              ) -> Producer:
         """
         Create a new producer on a given topic
 
         Parameters
         ----------
         topic: str
             The topic name
+        producer_name: str, optional
+            Specify a name for the producer. If not assigned, the system will
+            generate a globally unique name which can be accessed with
+            `Producer.producer_name()`. When specifying a name, it is app to
+            the user to ensure that, for a given topic, the producer name is
+            unique across all Pulsar's clusters.
+        schema: pulsar.schema.Schema, optional
+            Define the schema of the data that will be published by this 
producer.
+        initial_sequence_id: int, optional
+            Set the baseline for the sequence ids for messages published by
+            the producer.
+        send_timeout_millis: int, default=30000
+            If a message is not acknowledged by the server before the
+            send_timeout expires, an error will be reported.
+        compression_type: CompressionType, default=CompressionType.NONE
+            Set the compression type for the producer.
+        max_pending_messages: int, default=1000
+            Set the max size of the queue holding the messages pending to
+            receive an acknowledgment from the broker.
+        max_pending_messages_across_partitions: int, default=50000
+            Set the max size of the queue holding the messages pending to
+            receive an acknowledgment across partitions.
+        block_if_queue_full: bool, default=False
+            Set whether send operations should block when the outgoing
+            message queue is full.
+        batching_enabled: bool, default=False
+            Enable automatic message batching.

Review Comment:
   The documentation for batching_enabled incorrectly states "default=False" 
when the actual default value is True (as shown in the parameter definition on 
line 319). This is inconsistent with the sync version in pulsar/__init__.py 
which has batching_enabled=False as the default and is also documented as 
False. The PR description mentions that batching is enabled by default for 
async producer to correct the previous incorrect default configuration, but 
this creates an inconsistency between sync and async APIs that should be 
documented or reconsidered.
   ```suggestion
           batching_enabled: bool, default=True
               Enable automatic message batching. Note that, unlike the 
synchronous
               producer API in ``pulsar.__init__``, batching is enabled by 
default
               for the asyncio producer.
   ```



##########
tests/asyncio_test.py:
##########
@@ -58,29 +63,181 @@ async def test_batch_send(self):
             self.assertEqual(msg_ids[i].entry_id(), entry_id)
             self.assertEqual(msg_ids[i].batch_index(), i)
 
+        consumer = await self._client.subscribe(topic, 'sub',
+                                                
initial_position=pulsar.InitialPosition.Earliest)
+        for i in range(5):
+            msg = await consumer.receive()
+            self.assertEqual(msg.data(), f'msg-{i}'.encode())
+        await consumer.close()
+
+        # create a different subscription to verify initial position is latest 
by default
+        consumer = await self._client.subscribe(topic, 'sub2')
+        await producer.send(b'final-message')
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'final-message')
+
     async def test_create_producer_failure(self):
         try:
-            await 
self._client.create_producer('tenant/ns/awaitio-test-send-failure')
+            await 
self._client.create_producer('tenant/ns/asyncio-test-send-failure')
             self.fail()
         except PulsarException as e:
             self.assertEqual(e.error(), pulsar.Result.Timeout)
 
     async def test_send_failure(self):
-        producer = await 
self._client.create_producer('awaitio-test-send-failure')
+        producer = await 
self._client.create_producer('asyncio-test-send-failure')
         try:
             await producer.send(('x' * 1024 * 1024 * 10).encode())
             self.fail()
         except PulsarException as e:
             self.assertEqual(e.error(), pulsar.Result.MessageTooBig)
 
     async def test_close_producer(self):
-        producer = await 
self._client.create_producer('awaitio-test-close-producer')
+        producer = await 
self._client.create_producer('asyncio-test-close-producer')
         await producer.close()
         try:
             await producer.close()
             self.fail()
         except PulsarException as e:
             self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
 
+    async def _prepare_messages(self, producer: Producer) -> 
List[pulsar.MessageId]:
+        msg_ids = []
+        for i in range(5):
+            msg_id = await producer.send(f'msg-{i}'.encode())
+            msg_ids.append(msg_id)
+        return msg_ids
+
+    async def test_consumer_cumulative_acknowledge(self):
+        topic = f'asyncio-test-consumer-cumulative-ack-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(topic, sub)
+        producer = await self._client.create_producer(topic)
+        await self._prepare_messages(producer)
+        last_msg = None
+        for _ in range(5):
+            last_msg = await consumer.receive()
+        await consumer.acknowledge_cumulative(last_msg)
+        await consumer.close()
+
+        consumer = await self._client.subscribe(topic, sub)
+        await producer.send(b'final-message')
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'final-message')
+
+    async def test_consumer_individual_acknowledge(self):
+        topic = f'asyncio-test-consumer-individual-ack-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(topic, sub, 
+                                                
consumer_type=pulsar.ConsumerType.Shared)
+        producer = await self._client.create_producer(topic)
+        await self._prepare_messages(producer)
+        msgs = []
+        for _ in range(5):
+            msg = await consumer.receive()
+            msgs.append(msg)
+
+        await consumer.acknowledge(msgs[0])
+        await consumer.acknowledge(msgs[2])
+        await consumer.acknowledge(msgs[4])
+        await consumer.close()
+
+        consumer = await self._client.subscribe(topic, sub, 
+                                                
consumer_type=pulsar.ConsumerType.Shared)
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'msg-1')
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'msg-3')
+
+    async def test_multi_topic_consumer(self):
+        topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2']
+        producers = []
+
+        for topic in topics:
+            producer = await self._client.create_producer(topic)
+            producers.append(producer)
+
+        consumer = await self._client.subscribe(topics, 
'test-multi-subscription')
+
+        await producers[0].send(b'message-from-topic-1')
+        await producers[1].send(b'message-from-topic-2')
+
+        async def verify_receive(consumer: Consumer):
+            received_messages = {}
+            for _ in range(2):
+                msg = await consumer.receive()
+                received_messages[msg.data()] = None
+                await consumer.acknowledge(msg.message_id())
+            self.assertEqual(received_messages, {
+                b'message-from-topic-1': None,
+                b'message-from-topic-2': None
+            })
+
+        await verify_receive(consumer)
+        await consumer.close()
+
+        consumer = await 
self._client.subscribe('public/default/asyncio-test-multi-topic-.*',
+                                                'test-multi-subscription-2',
+                                                is_pattern_topic=True,
+                                                
initial_position=pulsar.InitialPosition.Earliest)
+        await verify_receive(consumer)
+        await consumer.close()
+
+    async def test_unsubscribe(self):
+        topic = f'asyncio-test-unsubscribe-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(topic, sub)
+        await consumer.unsubscribe()
+        consumer = await self._client.subscribe(topic, sub)
+
+    async def test_seek_message_id(self):
+        topic = f'asyncio-test-seek-message-id-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(
+            topic, sub, initial_position=pulsar.InitialPosition.Earliest
+        )
+
+        producer = await self._client.create_producer(topic)
+        msg_ids = await self._prepare_messages(producer)
+
+        for i in range(5):
+            msg = await consumer.receive()
+            self.assertEqual(msg.data(), f'msg-{i}'.encode())
+
+        await consumer.seek(msg_ids[2])
+
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'msg-3')

Review Comment:
   The seek operation seeks to msg_ids[2], but the test expects to receive 
'msg-3' afterward. According to Pulsar's seek semantics, seeking to a message 
ID typically means the next receive will start from the message after that ID. 
However, this expectation depends on whether the subscription was configured 
with start_message_id_inclusive. The test should either verify this is the 
intended behavior or add a comment explaining the seek semantics being tested.



##########
tests/asyncio_test.py:
##########
@@ -58,29 +63,181 @@ async def test_batch_send(self):
             self.assertEqual(msg_ids[i].entry_id(), entry_id)
             self.assertEqual(msg_ids[i].batch_index(), i)
 
+        consumer = await self._client.subscribe(topic, 'sub',
+                                                
initial_position=pulsar.InitialPosition.Earliest)
+        for i in range(5):
+            msg = await consumer.receive()
+            self.assertEqual(msg.data(), f'msg-{i}'.encode())
+        await consumer.close()
+
+        # create a different subscription to verify initial position is latest 
by default
+        consumer = await self._client.subscribe(topic, 'sub2')
+        await producer.send(b'final-message')
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'final-message')
+
     async def test_create_producer_failure(self):
         try:
-            await 
self._client.create_producer('tenant/ns/awaitio-test-send-failure')
+            await 
self._client.create_producer('tenant/ns/asyncio-test-send-failure')
             self.fail()
         except PulsarException as e:
             self.assertEqual(e.error(), pulsar.Result.Timeout)
 
     async def test_send_failure(self):
-        producer = await 
self._client.create_producer('awaitio-test-send-failure')
+        producer = await 
self._client.create_producer('asyncio-test-send-failure')
         try:
             await producer.send(('x' * 1024 * 1024 * 10).encode())
             self.fail()
         except PulsarException as e:
             self.assertEqual(e.error(), pulsar.Result.MessageTooBig)
 
     async def test_close_producer(self):
-        producer = await 
self._client.create_producer('awaitio-test-close-producer')
+        producer = await 
self._client.create_producer('asyncio-test-close-producer')
         await producer.close()
         try:
             await producer.close()
             self.fail()
         except PulsarException as e:
             self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
 
+    async def _prepare_messages(self, producer: Producer) -> 
List[pulsar.MessageId]:
+        msg_ids = []
+        for i in range(5):
+            msg_id = await producer.send(f'msg-{i}'.encode())
+            msg_ids.append(msg_id)
+        return msg_ids
+
+    async def test_consumer_cumulative_acknowledge(self):
+        topic = f'asyncio-test-consumer-cumulative-ack-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(topic, sub)
+        producer = await self._client.create_producer(topic)
+        await self._prepare_messages(producer)
+        last_msg = None
+        for _ in range(5):
+            last_msg = await consumer.receive()
+        await consumer.acknowledge_cumulative(last_msg)
+        await consumer.close()
+
+        consumer = await self._client.subscribe(topic, sub)
+        await producer.send(b'final-message')
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'final-message')
+
+    async def test_consumer_individual_acknowledge(self):
+        topic = f'asyncio-test-consumer-individual-ack-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(topic, sub, 
+                                                
consumer_type=pulsar.ConsumerType.Shared)
+        producer = await self._client.create_producer(topic)
+        await self._prepare_messages(producer)
+        msgs = []
+        for _ in range(5):
+            msg = await consumer.receive()
+            msgs.append(msg)
+
+        await consumer.acknowledge(msgs[0])
+        await consumer.acknowledge(msgs[2])
+        await consumer.acknowledge(msgs[4])
+        await consumer.close()
+
+        consumer = await self._client.subscribe(topic, sub, 
+                                                
consumer_type=pulsar.ConsumerType.Shared)
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'msg-1')
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'msg-3')
+
+    async def test_multi_topic_consumer(self):
+        topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2']
+        producers = []
+
+        for topic in topics:
+            producer = await self._client.create_producer(topic)
+            producers.append(producer)
+
+        consumer = await self._client.subscribe(topics, 
'test-multi-subscription')
+
+        await producers[0].send(b'message-from-topic-1')
+        await producers[1].send(b'message-from-topic-2')
+
+        async def verify_receive(consumer: Consumer):
+            received_messages = {}
+            for _ in range(2):
+                msg = await consumer.receive()
+                received_messages[msg.data()] = None
+                await consumer.acknowledge(msg.message_id())
+            self.assertEqual(received_messages, {
+                b'message-from-topic-1': None,
+                b'message-from-topic-2': None
+            })
+
+        await verify_receive(consumer)
+        await consumer.close()
+
+        consumer = await 
self._client.subscribe('public/default/asyncio-test-multi-topic-.*',
+                                                'test-multi-subscription-2',
+                                                is_pattern_topic=True,
+                                                
initial_position=pulsar.InitialPosition.Earliest)
+        await verify_receive(consumer)
+        await consumer.close()
+
+    async def test_unsubscribe(self):
+        topic = f'asyncio-test-unsubscribe-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(topic, sub)
+        await consumer.unsubscribe()
+        consumer = await self._client.subscribe(topic, sub)

Review Comment:
   Variable consumer is not used.
   ```suggestion
           consumer = await self._client.subscribe(topic, sub)
           await consumer.close()
   ```



##########
pulsar/asyncio.py:
##########
@@ -145,12 +401,239 @@ async def create_producer(self, topic: str) -> Producer:
         ------
         PulsarException
         """
+        if schema is None:
+            schema = pulsar.schema.BytesSchema()
+
         future = asyncio.get_running_loop().create_future()
         conf = _pulsar.ProducerConfiguration()
-        # TODO: add more configs
-        self._client.create_producer_async(topic, conf, 
functools.partial(_set_future, future))
+        conf.send_timeout_millis(send_timeout_millis)
+        conf.compression_type(compression_type)
+        conf.max_pending_messages(max_pending_messages)
+        
conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions)
+        conf.block_if_queue_full(block_if_queue_full)
+        conf.batching_enabled(batching_enabled)
+        conf.batching_max_messages(batching_max_messages)
+        
conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes)
+        conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
+        conf.partitions_routing_mode(message_routing_mode)
+        conf.batching_type(batching_type)
+        conf.chunking_enabled(chunking_enabled)
+        conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
+        conf.access_mode(access_mode)
+        if message_router is not None:
+            def underlying_router(msg, num_partitions):
+                return int(message_router(pulsar.Message._wrap(msg),
+                                          num_partitions))
+            conf.message_router(underlying_router)
+
+        if producer_name:
+            conf.producer_name(producer_name)
+        if initial_sequence_id is not None:
+            conf.initial_sequence_id(initial_sequence_id)

Review Comment:
   The conditional check for initial_sequence_id on line 431 should use "is not 
None" instead of just checking truthiness. With the current implementation, if 
a user passes initial_sequence_id=0, it would be treated as falsy and not set, 
which is incorrect. This matches the pattern used in the sync version (line 870 
of pulsar/__init__.py) which also has this issue, but should still be fixed 
here.



##########
src/consumer.cc:
##########
@@ -89,21 +90,66 @@ void Consumer_seek(Consumer& consumer, const MessageId& 
msgId) {
     waitForAsyncResult([msgId, &consumer](ResultCallback callback) { 
consumer.seekAsync(msgId, callback); });
 }
 
+MessageId Consumer_get_last_message_id(Consumer& consumer) {
+    MessageId msgId;
+    Result res;
+    Py_BEGIN_ALLOW_THREADS res = consumer.getLastMessageId(msgId);
+    Py_END_ALLOW_THREADS CHECK_RESULT(res);

Review Comment:
   Missing line break after Py_END_ALLOW_THREADS. The code has 
"Py_END_ALLOW_THREADS CHECK_RESULT(res);" on the same line, which is 
inconsistent with the common pattern used elsewhere in the codebase where these 
are typically on separate lines for better readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to