This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 2704dd7  feat: add async redeliver supprot (#285)
2704dd7 is described below

commit 2704dd70441e88fa63a2fcce3bc384f6688af30a
Author: Nikolas Achatz <[email protected]>
AuthorDate: Tue Jan 27 20:19:09 2026 -0800

    feat: add async redeliver supprot (#285)
---
 pulsar/asyncio.py     | 11 +++++++++++
 tests/asyncio_test.py | 38 ++++++++++++++++++++++++++++++++++++++
 2 files changed, 49 insertions(+)

diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
index 064e353..7c3f14d 100644
--- a/pulsar/asyncio.py
+++ b/pulsar/asyncio.py
@@ -399,6 +399,17 @@ class Consumer:
         self._consumer.close_async(functools.partial(_set_future, future, 
value=None))
         await future
 
+    def redeliver_unacknowledged_messages(self):
+        """
+        Redelivers all the unacknowledged messages. In failover mode, the
+        request is ignored if the consumer is not active for the given topic. 
In
+        shared mode, the consumer's messages to be redelivered are distributed
+        across all the connected consumers. This is a non-blocking call and
+        doesn't throw an exception. In case the connection breaks, the messages
+        are redelivered after reconnect.
+        """
+        self._consumer.redeliver_unacknowledged_messages()
+
     def topic(self) -> str:
         """
         Return the topic this consumer is subscribed to.
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
index 66ff0fd..4fadde2 100644
--- a/tests/asyncio_test.py
+++ b/tests/asyncio_test.py
@@ -267,6 +267,44 @@ class AsyncioTest(IsolatedAsyncioTestCase):
         await verify_receive(consumer)
         await consumer.close()
 
+    async def test_async_dead_letter_policy(self):
+        topic = f'asyncio-test-dlq-{time.time()}'
+        dlq_topic = 'dlq-' + topic
+        max_redeliver_count = 5
+
+        dlq_consumer = await self._client.subscribe(dlq_topic, "my-sub", 
consumer_type=pulsar.ConsumerType.Shared)
+        consumer = await self._client.subscribe(topic, "my-sub", 
consumer_type=pulsar.ConsumerType.Shared,
+                                    
dead_letter_policy=pulsar.ConsumerDeadLetterPolicy(max_redeliver_count, 
dlq_topic, 'init-sub'))
+        producer = await self._client.create_producer(topic)
+
+        # Sen num msgs.
+        num = 10
+        for i in range(num):
+            await producer.send(b"hello-%d" % i)
+        await producer.flush()
+
+        # Redelivery all messages maxRedeliverCountNum time.
+        for i in range(1, num * max_redeliver_count + num + 1):
+            msg = await consumer.receive()
+            if i % num == 0:
+                consumer.redeliver_unacknowledged_messages()
+                print(f"Start redeliver msgs '{i}'")
+
+        with self.assertRaises(asyncio.TimeoutError):
+            await asyncio.wait_for(consumer.receive(), 0.1)
+
+        for i in range(num):
+            msg = await dlq_consumer.receive()
+            self.assertTrue(msg)
+            self.assertEqual(msg.data(), b"hello-%d" % i)
+            dlq_consumer.acknowledge(msg)
+
+        with self.assertRaises(asyncio.TimeoutError):
+            await asyncio.wait_for(dlq_consumer.receive(), 0.1)
+            
+        await consumer.close()
+        await dlq_consumer.close()
+
     async def test_unsubscribe(self):
         topic = f'asyncio-test-unsubscribe-{time.time()}'
         sub = 'sub'

Reply via email to