This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new 36d6fd6 feat: add individual negative acknowledgement for async
consumer (#282)
36d6fd6 is described below
commit 36d6fd63478fbf9c5fa348bdd14cc913c2b98bc1
Author: Nikolas Achatz <[email protected]>
AuthorDate: Sat Jan 17 03:04:36 2026 -0700
feat: add individual negative acknowledgement for async consumer (#282)
---
pulsar/asyncio.py | 25 +++++++++++++++++++++++++
src/consumer.cc | 12 ++++++++++++
tests/asyncio_test.py | 30 ++++++++++++++++++++++++++++++
3 files changed, 67 insertions(+)
diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
index 5c3178a..064e353 100644
--- a/pulsar/asyncio.py
+++ b/pulsar/asyncio.py
@@ -320,6 +320,31 @@ class Consumer:
)
await future
+ async def negative_acknowledge(
+ self,
+ message: Union[pulsar.Message, pulsar.MessageId, _pulsar.Message,
_pulsar.MessageId]
+ ) -> None:
+ """
+ Acknowledge the failure to process a single message asynchronously.
+
+ When a message is "negatively acked" it will be marked for redelivery
after
+ some fixed delay. The delay is configurable when constructing the
consumer
+ with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
+ This call is not blocking.
+
+ Parameters
+ ----------
+ message:
+ The received message or message id.
+ """
+ if isinstance(message, pulsar.Message):
+ msg = message._message
+ elif isinstance(message, pulsar.MessageId):
+ msg = message._msg_id
+ else:
+ msg = message
+ await asyncio.to_thread(self._consumer.negative_acknowledge, msg)
+
async def unsubscribe(self) -> None:
"""
Unsubscribe the current consumer from the topic asynchronously.
diff --git a/src/consumer.cc b/src/consumer.cc
index f1d7367..fa52720 100644
--- a/src/consumer.cc
+++ b/src/consumer.cc
@@ -133,6 +133,16 @@ void
Consumer_acknowledgeCumulativeAsync_message_id(Consumer& consumer, const Me
consumer.acknowledgeCumulativeAsync(msgId, callback);
}
+void Consumer_negative_acknowledgeAsync(Consumer& consumer, const Message&
msg, ResultCallback callback) {
+ py::gil_scoped_release release;
+ consumer.negativeAcknowledge(msg);
+}
+
+void Consumer_negative_acknowledgeAsync_message_id(Consumer& consumer, const
MessageId& msgId, ResultCallback callback) {
+ py::gil_scoped_release release;
+ consumer.negativeAcknowledge(msgId);
+}
+
void Consumer_closeAsync(Consumer& consumer, ResultCallback callback) {
py::gil_scoped_release release;
consumer.closeAsync(callback);
@@ -183,6 +193,8 @@ void export_consumer(py::module_& m) {
.def("acknowledge_async", &Consumer_acknowledgeAsync_message_id)
.def("acknowledge_cumulative_async",
&Consumer_acknowledgeCumulativeAsync)
.def("acknowledge_cumulative_async",
&Consumer_acknowledgeCumulativeAsync_message_id)
+ .def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync)
+ .def("negative_acknowledge_async",
&Consumer_negative_acknowledgeAsync_message_id)
.def("close_async", &Consumer_closeAsync)
.def("unsubscribe_async", &Consumer_unsubscribeAsync)
.def("seek_async", &Consumer_seekAsync)
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
index 048dc43..66ff0fd 100644
--- a/tests/asyncio_test.py
+++ b/tests/asyncio_test.py
@@ -203,6 +203,36 @@ class AsyncioTest(IsolatedAsyncioTestCase):
msg = await consumer.receive()
self.assertEqual(msg.data(), b'msg-3')
+ async def test_consumer_negative_acknowledge(self):
+ topic = f'asyncio-test-consumer-negative-ack-{time.time()}'
+ sub = 'sub'
+ consumer = await self._client.subscribe(topic, sub,
+
consumer_type=pulsar.ConsumerType.Shared,
+
negative_ack_redelivery_delay_ms=100)
+
+ producer = await self._client.create_producer(topic)
+ await self._prepare_messages(producer)
+ msgs = []
+ for _ in range(5):
+ msg = await consumer.receive()
+ msgs.append(msg)
+
+ await consumer.acknowledge(msgs[1])
+ await consumer.acknowledge(msgs[3])
+
+ await consumer.negative_acknowledge(msgs[0])
+ await consumer.negative_acknowledge(msgs[2])
+ await consumer.negative_acknowledge(msgs[4])
+ await asyncio.sleep(0.2)
+
+ received = []
+ for _ in range(3):
+ msg = await consumer.receive()
+ received.append(msg.data())
+
+ self.assertEqual(sorted(received), [b'msg-0', b'msg-2', b'msg-4'])
+ await consumer.close()
+
async def test_multi_topic_consumer(self):
topics = ['asyncio-test-multi-topic-1', 'asyncio-test-multi-topic-2']
producers = []