BewareMyPower commented on code in PR #277:
URL: 
https://github.com/apache/pulsar-client-python/pull/277#discussion_r2643196885


##########
tests/asyncio_test.py:
##########
@@ -58,29 +63,181 @@ async def test_batch_send(self):
             self.assertEqual(msg_ids[i].entry_id(), entry_id)
             self.assertEqual(msg_ids[i].batch_index(), i)
 
+        consumer = await self._client.subscribe(topic, 'sub',
+                                                
initial_position=pulsar.InitialPosition.Earliest)
+        for i in range(5):
+            msg = await consumer.receive()
+            self.assertEqual(msg.data(), f'msg-{i}'.encode())
+        await consumer.close()
+
+        # create a different subscription to verify initial position is latest 
by default
+        consumer = await self._client.subscribe(topic, 'sub2')
+        await producer.send(b'final-message')
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'final-message')
+
     async def test_create_producer_failure(self):
         try:
-            await 
self._client.create_producer('tenant/ns/awaitio-test-send-failure')
+            await 
self._client.create_producer('tenant/ns/asyncio-test-send-failure')
             self.fail()
         except PulsarException as e:
             self.assertEqual(e.error(), pulsar.Result.Timeout)
 
     async def test_send_failure(self):
-        producer = await 
self._client.create_producer('awaitio-test-send-failure')
+        producer = await 
self._client.create_producer('asyncio-test-send-failure')
         try:
             await producer.send(('x' * 1024 * 1024 * 10).encode())
             self.fail()
         except PulsarException as e:
             self.assertEqual(e.error(), pulsar.Result.MessageTooBig)
 
     async def test_close_producer(self):
-        producer = await 
self._client.create_producer('awaitio-test-close-producer')
+        producer = await 
self._client.create_producer('asyncio-test-close-producer')
         await producer.close()
         try:
             await producer.close()
             self.fail()
         except PulsarException as e:
             self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)
 
+    async def _prepare_messages(self, producer: Producer) -> 
List[pulsar.MessageId]:
+        msg_ids = []
+        for i in range(5):
+            msg_id = await producer.send(f'msg-{i}'.encode())
+            msg_ids.append(msg_id)
+        return msg_ids
+
+    async def test_consumer_cumulative_acknowledge(self):
+        topic = f'asyncio-test-consumer-cumulative-ack-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(topic, sub)
+        producer = await self._client.create_producer(topic)
+        await self._prepare_messages(producer)
+        last_msg = None
+        for _ in range(5):
+            last_msg = await consumer.receive()
+        await consumer.acknowledge_cumulative(last_msg)
+        await consumer.close()
+
+        consumer = await self._client.subscribe(topic, sub)
+        await producer.send(b'final-message')
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'final-message')
+
+    async def test_consumer_individual_acknowledge(self):
+        topic = f'asyncio-test-consumer-individual-ack-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(topic, sub, 
+                                                
consumer_type=pulsar.ConsumerType.Shared)
+        producer = await self._client.create_producer(topic)
+        await self._prepare_messages(producer)
+        msgs = []
+        for _ in range(5):
+            msg = await consumer.receive()
+            msgs.append(msg)
+
+        await consumer.acknowledge(msgs[0])
+        await consumer.acknowledge(msgs[2])
+        await consumer.acknowledge(msgs[4])
+        await consumer.close()
+
+        consumer = await self._client.subscribe(topic, sub, 
+                                                
consumer_type=pulsar.ConsumerType.Shared)
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'msg-1')
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'msg-3')
+
+    async def test_multi_topic_consumer(self):
+        topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2']
+        producers = []
+
+        for topic in topics:
+            producer = await self._client.create_producer(topic)
+            producers.append(producer)
+
+        consumer = await self._client.subscribe(topics, 
'test-multi-subscription')
+
+        await producers[0].send(b'message-from-topic-1')
+        await producers[1].send(b'message-from-topic-2')
+
+        async def verify_receive(consumer: Consumer):
+            received_messages = {}
+            for _ in range(2):
+                msg = await consumer.receive()
+                received_messages[msg.data()] = None
+                await consumer.acknowledge(msg.message_id())
+            self.assertEqual(received_messages, {
+                b'message-from-topic-1': None,
+                b'message-from-topic-2': None
+            })
+
+        await verify_receive(consumer)
+        await consumer.close()
+
+        consumer = await 
self._client.subscribe('public/default/asyncio-test-multi-topic-.*',
+                                                'test-multi-subscription-2',
+                                                is_pattern_topic=True,
+                                                
initial_position=pulsar.InitialPosition.Earliest)
+        await verify_receive(consumer)
+        await consumer.close()
+
+    async def test_unsubscribe(self):
+        topic = f'asyncio-test-unsubscribe-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(topic, sub)
+        await consumer.unsubscribe()
+        consumer = await self._client.subscribe(topic, sub)
+
+    async def test_seek_message_id(self):
+        topic = f'asyncio-test-seek-message-id-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(
+            topic, sub, initial_position=pulsar.InitialPosition.Earliest
+        )
+
+        producer = await self._client.create_producer(topic)
+        msg_ids = await self._prepare_messages(producer)
+
+        for i in range(5):
+            msg = await consumer.receive()
+            self.assertEqual(msg.data(), f'msg-{i}'.encode())
+
+        await consumer.seek(msg_ids[2])
+
+        msg = await consumer.receive()
+        self.assertEqual(msg.data(), b'msg-3')

Review Comment:
   It's expected behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to