This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a631d6  Support deserializing a message id from bytes and topic (#259)
2a631d6 is described below

commit 2a631d60f97962990855fce1166f18553721d328
Author: Yunze Xu <[email protected]>
AuthorDate: Sun Jun 8 19:56:15 2025 +0800

    Support deserializing a message id from bytes and topic (#259)
---
 pulsar/__init__.py   | 22 ++++++++++++++++++++--
 src/message.cc       |  4 ++++
 tests/pulsar_test.py | 24 ++++++++++++++++++++++++
 3 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index de8787f..8802493 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -131,12 +131,30 @@ class MessageId:
         return self._msg_id > other._msg_id
 
     @staticmethod
-    def deserialize(message_id_bytes):
+    def deserialize(message_id_bytes, topic: Optional[str] = None) -> 
_pulsar.MessageId:
         """
         Deserialize a message id object from a previously
         serialized bytes sequence.
+
+        Parameters
+        ----------
+        topic: str, optional
+            For multi-topics consumers, the topic name is required to 
deserialize the message id.
+
+            .. code-block:: python
+
+                msg = consumer.receive()
+                topic = msg.topic_name()
+                msg_id_bytes = msg.message_id().serialize()
+                # Store topic and msg_id_bytes somewhere
+                # Later, deserialize the message id
+                msg_id = MessageId.deserialize(msg_id_bytes, topic=topic)
+
         """
-        return _pulsar.MessageId.deserialize(message_id_bytes)
+        msg_id = _pulsar.MessageId.deserialize(message_id_bytes)
+        if topic is not None:
+            msg_id.topic_name(topic)
+        return msg_id
 
     @classmethod
     def wrap(cls, msg_id: _pulsar.MessageId):
diff --git a/src/message.cc b/src/message.cc
index dec6f05..dd263b6 100644
--- a/src/message.cc
+++ b/src/message.cc
@@ -72,6 +72,10 @@ void export_message(py::module_& m) {
         .def("entry_id", &MessageId::entryId)
         .def("batch_index", &MessageId::batchIndex)
         .def("partition", &MessageId::partition)
+        .def(
+            "topic_name",
+            [](MessageId& msgId, const std::string& topicName) { 
msgId.setTopicName(topicName); },
+            return_value_policy::copy)
         .def_property_readonly_static("earliest", [](object) { return 
MessageId::earliest(); })
         .def_property_readonly_static("latest", [](object) { return 
MessageId::latest(); })
         .def("serialize",
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 6c0c3f2..4e1c5fb 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -1994,6 +1994,30 @@ class PulsarTest(TestCase):
         self.assertEqual(consumer.consumer_name(), name)
         client.close()
 
+    def test_deserialize_msg_id_with_topic(self):
+        client = Client(self.serviceUrl)
+        topic1 = "deserialize-msg-id-with-topic1-" + str(time.time())
+        topic2 = "deserialize-msg-id-with-topic2-" + str(time.time())
+        consumer = client.subscribe([topic1, topic2], 'sub')
+        producer1 = client.create_producer(topic1)
+        producer2 = client.create_producer(topic2)
+        producer1.send(b"msg-1")
+        producer2.send(b"msg-2")
+
+        serialized_msg_ids = dict()
+        for _ in range(2):
+            msg = consumer.receive(TM)
+            serialized_msg_ids[msg.topic_name()] = msg.message_id().serialize()
+        for topic, serialized_msg_id in serialized_msg_ids.items():
+            deserialized_msg_id = MessageId.deserialize(serialized_msg_id, 
topic=topic)
+            consumer.acknowledge_cumulative(deserialized_msg_id)
+        consumer.close()
+
+        consumer = client.subscribe([topic1, topic2], 'sub')
+        producer1.send(b'msg-3')
+        msg = consumer.receive(TM)
+        self.assertEqual(msg.value(), b'msg-3')
+        client.close()
 
 if __name__ == "__main__":
     main()

Reply via email to