Copilot commented on code in PR #276:
URL:
https://github.com/apache/pulsar-client-python/pull/276#discussion_r2634884483
##########
tests/pulsar_test.py:
##########
@@ -520,22 +543,28 @@ def verify_next_message(value: bytes):
producer.send(b"msg-2")
verify_next_message(b"msg-2") # msg-1 is skipped since the crypto
failure action is DISCARD
+ producer.send_async(b"msg-3", None)
+ producer.send_async(b"msg-4", None)
+ producer.flush()
+
+ def verify_undecrypted_message(msg: pulsar.Message, i: int):
+ self.assertNotEqual(msg.data(), f"msg-{i}".encode())
+ self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
+ verify_encryption_context(msg.encryption_context(), True, 2 if i
>= 3 else -1)
# Encrypted messages will be consumed since the crypto failure action
is CONSUME
consumer = client.subscribe(topic, 'another-sub',
initial_position=InitialPosition.Earliest,
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
for i in range(3):
msg = consumer.receive(3000)
- self.assertNotEqual(msg.data(), f"msg-{i}".encode())
- self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
+ verify_undecrypted_message(msg, i)
reader = client.create_reader(topic, MessageId.earliest,
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
for i in range(3):
msg = reader.read_next(3000)
- self.assertNotEqual(msg.data(), f"msg-{i}".encode())
- self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
+ verify_undecrypted_message(msg, i)
Review Comment:
The loop iterates only 3 times but there are 5 messages sent (msg-0, msg-1,
msg-2, msg-3, msg-4). This should iterate 5 times using range(5) to verify all
messages including the batched messages msg-3 and msg-4.
##########
pulsar/__init__.py:
##########
@@ -250,6 +366,15 @@ def producer_name(self) -> str:
"""
return self._message.producer_name()
+ def encryption_context(self) -> EncryptionContext:
Review Comment:
The return type annotation should be Optional[EncryptionContext] instead of
EncryptionContext since the method can return None when the message is not
encrypted, as documented in the docstring and shown in the implementation.
```suggestion
def encryption_context(self) -> Optional[EncryptionContext]:
```
##########
pulsar/__init__.py:
##########
@@ -166,6 +166,122 @@ def wrap(cls, msg_id: _pulsar.MessageId):
self._msg_id = msg_id
return self
+
+class EncryptionKey:
+ """
+ The key used for encryption.
+ """
+
+ def __init__(self, key: _pulsar.EncryptionKey):
+ """
+ Create EncryptionKey instance.
+
+ Parameters
+ ----------
+ key: _pulsar.EncryptionKey
+ The underlying EncryptionKey instance from the C extension.
+ """
+ self._key = key
+
+ @property
+ def key(self) -> str:
+ """
+ Returns the key, which is usually the key file's name.
+ """
+ return self._key.key
+
+ @property
+ def value(self) -> bytes:
+ """
+ Returns the value, which is usually the key bytes used for encryption.
+ """
+ return self._key.value()
+
+ @property
+ def metadata(self) -> dict:
+ """
+ Returns the metadata associated with the key.
+ """
+ return self._key.metadata
+
+ def __str__(self) -> str:
+ return f"EncryptionKey(key={self.key}, value_len={len(self.value)},
metadata={self.metadata})"
+
+ def __repr__(self) -> str:
+ return self.__str__()
+
+
+class EncryptionContext:
+ """
+ It contains encryption and compression information in it using which
application can decrypt
+ consumed message with encrypted-payload.
+ """
+
+ def __init__(self, context: _pulsar.EncryptionContext):
+ """
+ Create EncryptionContext instance.
+
+ Parameters
+ ----------
+ key: _pulsar.EncryptionContext
Review Comment:
The parameter name in the docstring is incorrect. It says "key:
_pulsar.EncryptionContext" but should be "context: _pulsar.EncryptionContext"
to match the actual parameter name.
```suggestion
context: _pulsar.EncryptionContext
```
##########
tests/pulsar_test.py:
##########
@@ -489,15 +490,37 @@ def test_encryption_failure(self):
client = Client(self.serviceUrl)
topic = "my-python-test-end-to-end-encryption-failure-" +
str(time.time())
producer = client.create_producer(
- topic=topic, encryption_key="client-rsa.pem",
crypto_key_reader=crypto_key_reader
+ topic=topic, encryption_key="client-rsa.pem",
crypto_key_reader=crypto_key_reader,
+ compression_type=CompressionType.LZ4
)
producer.send(b"msg-0")
+ enc_key = None
+ def verify_encryption_context(context: pulsar.EncryptionContext,
failed: bool, batch_size: int):
+ nonlocal enc_key
+ keys = context.keys()
+ self.assertEqual(len(keys), 1)
+ key = keys[0]
+ self.assertEqual(key.key, "client-rsa.pem")
+ self.assertTrue(len(key.value) > 0)
+ if enc_key is None:
+ enc_key = key.value
+ else:
+ self.assertEqual(key.value, enc_key)
+ self.assertEqual(key.metadata, {})
+ self.assertTrue(len(context.param()) > 0)
Review Comment:
assertTrue(a > b) cannot provide an informative message. Using
assertGreater(a, b) instead will give more informative messages.
```suggestion
self.assertGreater(len(key.value), 0)
if enc_key is None:
enc_key = key.value
else:
self.assertEqual(key.value, enc_key)
self.assertEqual(key.metadata, {})
self.assertGreater(len(context.param()), 0)
```
##########
tests/pulsar_test.py:
##########
@@ -520,22 +543,28 @@ def verify_next_message(value: bytes):
producer.send(b"msg-2")
verify_next_message(b"msg-2") # msg-1 is skipped since the crypto
failure action is DISCARD
+ producer.send_async(b"msg-3", None)
+ producer.send_async(b"msg-4", None)
+ producer.flush()
+
+ def verify_undecrypted_message(msg: pulsar.Message, i: int):
+ self.assertNotEqual(msg.data(), f"msg-{i}".encode())
+ self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
+ verify_encryption_context(msg.encryption_context(), True, 2 if i
>= 3 else -1)
# Encrypted messages will be consumed since the crypto failure action
is CONSUME
consumer = client.subscribe(topic, 'another-sub',
initial_position=InitialPosition.Earliest,
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
for i in range(3):
msg = consumer.receive(3000)
- self.assertNotEqual(msg.data(), f"msg-{i}".encode())
- self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
+ verify_undecrypted_message(msg, i)
Review Comment:
The loop iterates only 3 times but there are 5 messages sent (msg-0, msg-1,
msg-2, msg-3, msg-4). This should iterate 5 times using range(5) to verify all
messages including the batched messages msg-3 and msg-4.
##########
tests/pulsar_test.py:
##########
@@ -489,15 +490,37 @@ def test_encryption_failure(self):
client = Client(self.serviceUrl)
topic = "my-python-test-end-to-end-encryption-failure-" +
str(time.time())
producer = client.create_producer(
- topic=topic, encryption_key="client-rsa.pem",
crypto_key_reader=crypto_key_reader
+ topic=topic, encryption_key="client-rsa.pem",
crypto_key_reader=crypto_key_reader,
+ compression_type=CompressionType.LZ4
)
producer.send(b"msg-0")
+ enc_key = None
+ def verify_encryption_context(context: pulsar.EncryptionContext,
failed: bool, batch_size: int):
+ nonlocal enc_key
+ keys = context.keys()
+ self.assertEqual(len(keys), 1)
+ key = keys[0]
+ self.assertEqual(key.key, "client-rsa.pem")
+ self.assertTrue(len(key.value) > 0)
+ if enc_key is None:
+ enc_key = key.value
+ else:
+ self.assertEqual(key.value, enc_key)
+ self.assertEqual(key.metadata, {})
+ self.assertTrue(len(context.param()) > 0)
Review Comment:
assertTrue(a > b) cannot provide an informative message. Using
assertGreater(a, b) instead will give more informative messages.
```suggestion
self.assertGreater(len(context.param()), 0)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]