This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git
The following commit(s) were added to refs/heads/main by this push:
new d781707 feat: add async last message id support (#284)
d781707 is described below
commit d7817070215c7f84b3ca5f5481fe8e4c1a191575
Author: Nikolas Achatz <[email protected]>
AuthorDate: Tue Jan 27 21:07:08 2026 -0800
feat: add async last message id support (#284)
---
pulsar/asyncio.py | 9 +++++++++
src/consumer.cc | 8 +++++++-
tests/asyncio_test.py | 16 ++++++++++++++++
3 files changed, 32 insertions(+), 1 deletion(-)
diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
index 7c3f14d..34377d6 100644
--- a/pulsar/asyncio.py
+++ b/pulsar/asyncio.py
@@ -399,6 +399,15 @@ class Consumer:
self._consumer.close_async(functools.partial(_set_future, future,
value=None))
await future
+ async def get_last_message_id(self) -> _pulsar.MessageId:
+ """
+ Asynchronously get the last message id.
+ """
+ future = asyncio.get_running_loop().create_future()
+
self._consumer.get_last_message_id_async(functools.partial(_set_future, future))
+ id = await future
+ return id
+
def redeliver_unacknowledged_messages(self):
"""
Redelivers all the unacknowledged messages. In failover mode, the
diff --git a/src/consumer.cc b/src/consumer.cc
index fa52720..f2854f9 100644
--- a/src/consumer.cc
+++ b/src/consumer.cc
@@ -106,6 +106,11 @@ MessageId Consumer_get_last_message_id(Consumer& consumer)
{
return msgId;
}
+void Consumer_get_last_message_id_async(Consumer& consumer,
GetLastMessageIdCallback callback) {
+ py::gil_scoped_release release;
+ consumer.getLastMessageIdAsync(callback);
+}
+
void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) {
py::gil_scoped_release release;
consumer.receiveAsync(callback);
@@ -194,7 +199,8 @@ void export_consumer(py::module_& m) {
.def("acknowledge_cumulative_async",
&Consumer_acknowledgeCumulativeAsync)
.def("acknowledge_cumulative_async",
&Consumer_acknowledgeCumulativeAsync_message_id)
.def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync)
- .def("negative_acknowledge_async",
&Consumer_negative_acknowledgeAsync_message_id)
+ .def("negative_acknowledge_async",
&Consumer_negative_acknowledgeAsync_message_id)
+ .def("get_last_message_id_async", &Consumer_get_last_message_id_async)
.def("close_async", &Consumer_closeAsync)
.def("unsubscribe_async", &Consumer_unsubscribeAsync)
.def("seek_async", &Consumer_seekAsync)
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
index 4fadde2..d1c923a 100644
--- a/tests/asyncio_test.py
+++ b/tests/asyncio_test.py
@@ -33,6 +33,7 @@ from unittest import (
)
import pulsar # pylint: disable=import-error
+import _pulsar # pylint: disable=import-error
from pulsar.asyncio import ( # pylint: disable=import-error
Client,
Consumer,
@@ -267,6 +268,21 @@ class AsyncioTest(IsolatedAsyncioTestCase):
await verify_receive(consumer)
await consumer.close()
+ async def test_consumer_get_last_message_id(self):
+ topic = f'asyncio-test-get-last-message-id-{time.time()}'
+ sub = 'sub'
+ consumer = await self._client.subscribe(topic, sub,
+
consumer_type=pulsar.ConsumerType.Shared)
+ producer = await self._client.create_producer(topic)
+ for i in range(5):
+ msg = f'msg-{i}'.encode()
+ await producer.send(msg)
+ last_msg_id = await consumer.get_last_message_id()
+ assert isinstance(last_msg_id, _pulsar.MessageId)
+ assert last_msg_id.entry_id() == i
+ await consumer.acknowledge(last_msg_id)
+ await consumer.close()
+
async def test_async_dead_letter_policy(self):
topic = f'asyncio-test-dlq-{time.time()}'
dlq_topic = 'dlq-' + topic