This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new d781707  feat: add async last message id support (#284)
d781707 is described below

commit d7817070215c7f84b3ca5f5481fe8e4c1a191575
Author: Nikolas Achatz <[email protected]>
AuthorDate: Tue Jan 27 21:07:08 2026 -0800

    feat: add async last message id support (#284)
---
 pulsar/asyncio.py     |  9 +++++++++
 src/consumer.cc       |  8 +++++++-
 tests/asyncio_test.py | 16 ++++++++++++++++
 3 files changed, 32 insertions(+), 1 deletion(-)

diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py
index 7c3f14d..34377d6 100644
--- a/pulsar/asyncio.py
+++ b/pulsar/asyncio.py
@@ -399,6 +399,15 @@ class Consumer:
         self._consumer.close_async(functools.partial(_set_future, future, 
value=None))
         await future
 
+    async def get_last_message_id(self) -> _pulsar.MessageId:
+        """
+        Asynchronously get the last message id.
+        """
+        future = asyncio.get_running_loop().create_future()
+        
self._consumer.get_last_message_id_async(functools.partial(_set_future, future))
+        id = await future
+        return id
+
     def redeliver_unacknowledged_messages(self):
         """
         Redelivers all the unacknowledged messages. In failover mode, the
diff --git a/src/consumer.cc b/src/consumer.cc
index fa52720..f2854f9 100644
--- a/src/consumer.cc
+++ b/src/consumer.cc
@@ -106,6 +106,11 @@ MessageId Consumer_get_last_message_id(Consumer& consumer) 
{
     return msgId;
 }
 
+void Consumer_get_last_message_id_async(Consumer& consumer, 
GetLastMessageIdCallback callback) {
+    py::gil_scoped_release release;
+    consumer.getLastMessageIdAsync(callback);
+}
+
 void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) {
     py::gil_scoped_release release;
     consumer.receiveAsync(callback);
@@ -194,7 +199,8 @@ void export_consumer(py::module_& m) {
         .def("acknowledge_cumulative_async", 
&Consumer_acknowledgeCumulativeAsync)
         .def("acknowledge_cumulative_async", 
&Consumer_acknowledgeCumulativeAsync_message_id)
         .def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync)
-        .def("negative_acknowledge_async", 
&Consumer_negative_acknowledgeAsync_message_id) 
+        .def("negative_acknowledge_async", 
&Consumer_negative_acknowledgeAsync_message_id)
+        .def("get_last_message_id_async", &Consumer_get_last_message_id_async)
         .def("close_async", &Consumer_closeAsync)
         .def("unsubscribe_async", &Consumer_unsubscribeAsync)
         .def("seek_async", &Consumer_seekAsync)
diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py
index 4fadde2..d1c923a 100644
--- a/tests/asyncio_test.py
+++ b/tests/asyncio_test.py
@@ -33,6 +33,7 @@ from unittest import (
 )
 
 import pulsar  # pylint: disable=import-error
+import _pulsar # pylint: disable=import-error
 from pulsar.asyncio import (  # pylint: disable=import-error
     Client,
     Consumer,
@@ -267,6 +268,21 @@ class AsyncioTest(IsolatedAsyncioTestCase):
         await verify_receive(consumer)
         await consumer.close()
 
+    async def test_consumer_get_last_message_id(self):
+        topic = f'asyncio-test-get-last-message-id-{time.time()}'
+        sub = 'sub'
+        consumer = await self._client.subscribe(topic, sub,
+                                                
consumer_type=pulsar.ConsumerType.Shared)
+        producer = await self._client.create_producer(topic)
+        for i in range(5):
+            msg = f'msg-{i}'.encode()
+            await producer.send(msg)
+            last_msg_id = await consumer.get_last_message_id()
+            assert isinstance(last_msg_id, _pulsar.MessageId)
+            assert last_msg_id.entry_id() == i
+            await consumer.acknowledge(last_msg_id)
+        await consumer.close()
+
     async def test_async_dead_letter_policy(self):
         topic = f'asyncio-test-dlq-{time.time()}'
         dlq_topic = 'dlq-' + topic

Reply via email to