This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new 2704dd7 feat: add async redeliver supprot (#285)
2704dd7 is described below
commit 2704dd70441e88fa63a2fcce3bc384f6688af30a
Author: Nikolas Achatz <[email protected]>
AuthorDate: Tue Jan 27 20:19:09 2026 -0800
feat: add async redeliver supprot (#285)
---
pulsar/asyncio.py | 11 +++++++++++
tests/asyncio_test.py | 38 ++++++++++++++++++++++++++++++++++++++
2 files changed, 49 insertions(+)
diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
index 064e353..7c3f14d 100644
--- a/pulsar/asyncio.py
+++ b/pulsar/asyncio.py
@@ -399,6 +399,17 @@ class Consumer:
self._consumer.close_async(functools.partial(_set_future, future,
value=None))
await future
+ def redeliver_unacknowledged_messages(self):
+ """
+ Redelivers all the unacknowledged messages. In failover mode, the
+ request is ignored if the consumer is not active for the given topic.
In
+ shared mode, the consumer's messages to be redelivered are distributed
+ across all the connected consumers. This is a non-blocking call and
+ doesn't throw an exception. In case the connection breaks, the messages
+ are redelivered after reconnect.
+ """
+ self._consumer.redeliver_unacknowledged_messages()
+
def topic(self) -> str:
"""
Return the topic this consumer is subscribed to.
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
index 66ff0fd..4fadde2 100644
--- a/tests/asyncio_test.py
+++ b/tests/asyncio_test.py
@@ -267,6 +267,44 @@ class AsyncioTest(IsolatedAsyncioTestCase):
await verify_receive(consumer)
await consumer.close()
+ async def test_async_dead_letter_policy(self):
+ topic = f'asyncio-test-dlq-{time.time()}'
+ dlq_topic = 'dlq-' + topic
+ max_redeliver_count = 5
+
+ dlq_consumer = await self._client.subscribe(dlq_topic, "my-sub",
consumer_type=pulsar.ConsumerType.Shared)
+ consumer = await self._client.subscribe(topic, "my-sub",
consumer_type=pulsar.ConsumerType.Shared,
+
dead_letter_policy=pulsar.ConsumerDeadLetterPolicy(max_redeliver_count,
dlq_topic, 'init-sub'))
+ producer = await self._client.create_producer(topic)
+
+ # Sen num msgs.
+ num = 10
+ for i in range(num):
+ await producer.send(b"hello-%d" % i)
+ await producer.flush()
+
+ # Redelivery all messages maxRedeliverCountNum time.
+ for i in range(1, num * max_redeliver_count + num + 1):
+ msg = await consumer.receive()
+ if i % num == 0:
+ consumer.redeliver_unacknowledged_messages()
+ print(f"Start redeliver msgs '{i}'")
+
+ with self.assertRaises(asyncio.TimeoutError):
+ await asyncio.wait_for(consumer.receive(), 0.1)
+
+ for i in range(num):
+ msg = await dlq_consumer.receive()
+ self.assertTrue(msg)
+ self.assertEqual(msg.data(), b"hello-%d" % i)
+ dlq_consumer.acknowledge(msg)
+
+ with self.assertRaises(asyncio.TimeoutError):
+ await asyncio.wait_for(dlq_consumer.receive(), 0.1)
+
+ await consumer.close()
+ await dlq_consumer.close()
+
async def test_unsubscribe(self):
topic = f'asyncio-test-unsubscribe-{time.time()}'
sub = 'sub'