This is an automated email from the ASF dual-hosted git repository.

baodi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 4629281  Support encryption context on Message (#276)
4629281 is described below

commit 4629281c4e1ea9928c6f2442d0aae5de826037ad
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Dec 23 16:31:21 2025 +0800

    Support encryption context on Message (#276)
    
    * Support encryption context on Message
    
    * revert unnecessary change
    
    * fix tests
    
    * fix docs
    
    * fix tests
    
    * improve tests
---
 pulsar/__init__.py   | 125 +++++++++++++++++++++++++++++++++++++++++++++++++++
 src/message.cc       |  17 ++++++-
 tests/pulsar_test.py |  54 +++++++++++++++++++---
 3 files changed, 188 insertions(+), 8 deletions(-)

diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index 543cd0d..2375d16 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -166,6 +166,122 @@ class MessageId:
         self._msg_id = msg_id
         return self
 
+
+class EncryptionKey:
+    """
+    The key used for encryption.
+    """
+
+    def __init__(self, key: _pulsar.EncryptionKey):
+        """
+        Create EncryptionKey instance.
+
+        Parameters
+        ----------
+        key: _pulsar.EncryptionKey
+            The underlying EncryptionKey instance from the C extension.
+        """
+        self._key = key
+
+    @property
+    def key(self) -> str:
+        """
+        Returns the key, which is usually the key file's name.
+        """
+        return self._key.key
+
+    @property
+    def value(self) -> bytes:
+        """
+        Returns the value, which is usually the key bytes used for encryption.
+        """
+        return self._key.value()
+
+    @property
+    def metadata(self) -> dict:
+        """
+        Returns the metadata associated with the key.
+        """
+        return self._key.metadata
+
+    def __str__(self) -> str:
+        return f"EncryptionKey(key={self.key}, value_len={len(self.value)}, 
metadata={self.metadata})"
+
+    def __repr__(self) -> str:
+        return self.__str__()
+
+
+class EncryptionContext:
+    """
+    It contains encryption and compression information in it using which 
application can decrypt
+    consumed message with encrypted-payload.
+    """
+
+    def __init__(self, context: _pulsar.EncryptionContext):
+        """
+        Create EncryptionContext instance.
+
+        Parameters
+        ----------
+        context: _pulsar.EncryptionContext
+            The underlying EncryptionContext instance from the C extension.
+        """
+        self._context = context
+
+    def keys(self) -> List[EncryptionKey]:
+        """
+        Returns all EncryptionKey instances when performing encryption.
+        """
+        keys = self._context.keys()
+        return [EncryptionKey(key) for key in keys]
+
+    def param(self) -> bytes:
+        """
+        Returns the encryption param bytes.
+        """
+        return self._context.param()
+
+    def algorithm(self) -> str:
+        """
+        Returns the encryption algorithm.
+        """
+        return self._context.algorithm()
+
+    def compression_type(self) -> CompressionType:
+        """
+        Returns the compression type of the message.
+        """
+        return self._context.compression_type()
+
+    def uncompressed_message_size(self) -> int:
+        """
+        Returns the uncompressed message size or 0 if the compression type is 
NONE.
+        """
+        return self._context.uncompressed_message_size()
+
+    def batch_size(self) -> int:
+        """
+        Returns the number of messages in the batch or -1 if the message is 
not batched.
+        """
+        return self._context.batch_size()
+
+    def is_decryption_failed(self) -> bool:
+        """
+        Returns whether decryption has failed for this message.
+        """
+        return self._context.is_decryption_failed()
+
+    def __str__(self) -> str:
+        return f"EncryptionContext(algorithm={self.algorithm()}, "  \
+               f"compression_type={self.compression_type().name}, " \
+               f"uncompressed_message_size={self.uncompressed_message_size()}, 
" \
+               f"is_decryption_failed={self.is_decryption_failed()}, " \
+               f"keys=[{', '.join(str(key) for key in self.keys())}])"
+
+    def __repr__(self) -> str:
+        return self.__str__()
+
+
 class Message:
     """
     Message objects are returned by a consumer, either by calling `receive` or
@@ -250,6 +366,15 @@ class Message:
         """
         return self._message.producer_name()
 
+    def encryption_context(self) -> EncryptionContext | None:
+        """
+        Get the encryption context for this message or None if it's not 
encrypted.
+
+        It should be noted that the result should not be accessed after the 
current Message instance is deleted.
+        """
+        context = self._message.encryption_context()
+        return None if context is None else EncryptionContext(context)
+
     @staticmethod
     def _wrap(_message):
         self = Message()
diff --git a/src/message.cc b/src/message.cc
index e18861a..f3247e6 100644
--- a/src/message.cc
+++ b/src/message.cc
@@ -86,6 +86,20 @@ void export_message(py::module_& m) {
              })
         .def_static("deserialize", &MessageId::deserialize);
 
+    class_<EncryptionKey>(m, "EncryptionKey")
+        .def_readonly("key", &EncryptionKey::key)
+        .def("value", [](const EncryptionKey& key) { return bytes(key.value); 
})
+        .def_readonly("metadata", &EncryptionKey::metadata);
+
+    class_<EncryptionContext>(m, "EncryptionContext")
+        .def("keys", &EncryptionContext::keys)
+        .def("param", [](const EncryptionContext& context) { return 
bytes(context.param()); })
+        .def("algorithm", &EncryptionContext::algorithm, 
return_value_policy::copy)
+        .def("compression_type", &EncryptionContext::compressionType)
+        .def("uncompressed_message_size", 
&EncryptionContext::uncompressedMessageSize)
+        .def("batch_size", &EncryptionContext::batchSize)
+        .def("is_decryption_failed", &EncryptionContext::isDecryptionFailed);
+
     class_<Message>(m, "Message")
         .def(init<>())
         .def("properties", &Message::getProperties)
@@ -106,7 +120,8 @@ void export_message(py::module_& m) {
         .def("redelivery_count", &Message::getRedeliveryCount)
         .def("int_schema_version", &Message::getLongSchemaVersion)
         .def("schema_version", &Message::getSchemaVersion, 
return_value_policy::copy)
-        .def("producer_name", &Message::getProducerName, 
return_value_policy::copy);
+        .def("producer_name", &Message::getProducerName, 
return_value_policy::copy)
+        .def("encryption_context", &Message::getEncryptionContext, 
return_value_policy::reference);
 
     MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const 
std::string& payload,
                                                                uint32_t 
batchSize) = &MessageBatch::parseFrom;
diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py
index 3603d84..b7f38ed 100755
--- a/tests/pulsar_test.py
+++ b/tests/pulsar_test.py
@@ -167,6 +167,7 @@ class PulsarTest(TestCase):
         consumer.acknowledge(msg)
         print("receive from {}".format(msg.message_id()))
         self.assertEqual(msg_id, msg.message_id())
+        self.assertIsNone(msg.encryption_context())
         client.close()
 
     def test_producer_access_mode_exclusive(self):
@@ -489,15 +490,36 @@ class PulsarTest(TestCase):
         client = Client(self.serviceUrl)
         topic = "my-python-test-end-to-end-encryption-failure-" + 
str(time.time())
         producer = client.create_producer(
-            topic=topic, encryption_key="client-rsa.pem", 
crypto_key_reader=crypto_key_reader
+            topic=topic, encryption_key="client-rsa.pem", 
crypto_key_reader=crypto_key_reader,
+            compression_type=CompressionType.LZ4
         )
         producer.send(b"msg-0")
 
+        def verify_encryption_context(context: pulsar.EncryptionContext | 
None, failed: bool, batch_size: int):
+            if context is None:
+                self.fail("Encryption context is None")
+            keys = context.keys()
+            self.assertEqual(len(keys), 1)
+            key = keys[0]
+            self.assertEqual(key.key, "client-rsa.pem")
+            self.assertGreater(len(key.value), 0)
+            self.assertEqual(key.metadata, {})
+            self.assertGreater(len(context.param()), 0)
+            self.assertEqual(context.algorithm(), "")
+            self.assertEqual(context.compression_type(), CompressionType.LZ4)
+            if batch_size == -1:
+                self.assertEqual(context.uncompressed_message_size(), 
len(b"msg-0"))
+            else:
+                self.assertGreater(context.uncompressed_message_size(), 
len(b"msg-0"))
+            self.assertEqual(context.batch_size(), batch_size)
+            self.assertEqual(context.is_decryption_failed(), failed)
+
         def verify_next_message(value: bytes):
             consumer = client.subscribe(topic, subscription,
                                         crypto_key_reader=crypto_key_reader)
             msg = consumer.receive(3000)
             self.assertEqual(msg.data(), value)
+            verify_encryption_context(msg.encryption_context(), False, -1)
             consumer.acknowledge(msg)
             consumer.close()
 
@@ -520,22 +542,40 @@ class PulsarTest(TestCase):
 
         producer.send(b"msg-2")
         verify_next_message(b"msg-2") # msg-1 is skipped since the crypto 
failure action is DISCARD
+        producer.close()
+
+        # send batched messages
+        producer = client.create_producer(
+            topic=topic,
+            encryption_key="client-rsa.pem",
+            crypto_key_reader=crypto_key_reader,
+            compression_type=CompressionType.LZ4,
+            batching_enabled=True,
+        )
+        producer.send_async(b"msg-3", None)
+        producer.send_async(b"msg-4", None)
+        producer.flush()
+
+        def verify_undecrypted_message(msg: pulsar.Message, i: int):
+            self.assertNotEqual(msg.data(), f"msg-{i}".encode())
+            self.assertGreater(len(msg.data()), 5, f"msg.data() is 
{msg.data()}")
+            verify_encryption_context(msg.encryption_context(), True, 2 if i 
>= 3 else -1)
 
         # Encrypted messages will be consumed since the crypto failure action 
is CONSUME
+        # Only 4 messages can be received because msg-3 and msg-4 are sent in 
batch and they are delivered
+        # as a single message when decryption fails.
         consumer = client.subscribe(topic, 'another-sub',
                                     initial_position=InitialPosition.Earliest,
                                     
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
-        for i in range(3):
+        for i in range(4):
             msg = consumer.receive(3000)
-            self.assertNotEqual(msg.data(), f"msg-{i}".encode())
-            self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
+            verify_undecrypted_message(msg, i)
 
         reader = client.create_reader(topic, MessageId.earliest,
                                       
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
-        for i in range(3):
+        for i in range(4):
             msg = reader.read_next(3000)
-            self.assertNotEqual(msg.data(), f"msg-{i}".encode())
-            self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
+            verify_undecrypted_message(msg, i)
 
         client.close()
 

Reply via email to