This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new d01ecb0 Enable peeking encrypted batch messages (#11244) d01ecb0 is described below commit d01ecb041847070eb1d011ebccadab5ee1d40ca3 Author: Masahiro Sakamoto <massa...@yahoo-corp.jp> AuthorDate: Thu Jul 8 05:54:11 2021 +0900 Enable peeking encrypted batch messages (#11244) --- .../broker/admin/impl/PersistentTopicsBase.java | 1 + .../apache/pulsar/broker/admin/AdminApiTest.java | 43 ++++++++++++++++++++++ .../pulsar/client/admin/internal/TopicsImpl.java | 11 +++++- 3 files changed, 54 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index d85ea53..a3c4c3e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2519,6 +2519,7 @@ public class PersistentTopicsBase extends AdminResource { responseBuilder.header("X-Pulsar-PROPERTY-TOTAL-CHUNKS", Integer.toString(metadata.getNumChunksFromMsg())); responseBuilder.header("X-Pulsar-PROPERTY-CHUNK-ID", Integer.toString(metadata.getChunkId())); } + responseBuilder.header("X-Pulsar-Is-Encrypted", metadata.getEncryptionKeysCount() > 0); // Decode if needed CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 5c91131..f16d2a6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -84,6 +84,7 @@ import org.apache.pulsar.client.admin.internal.TenantsImpl; import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageRoutingMode; @@ -2988,6 +2989,48 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest { final String topicName = "non-persistent://prop-xyz/ns1/testTruncateTopic-" + UUID.randomUUID().toString(); admin.topics().createNonPartitionedTopic(topicName); assertThrows(() -> {admin.topics().truncate(topicName);}); + } + + @Test(timeOut = 20000) + public void testPeekEncryptedMessages() throws Exception { + final String topicName = "persistent://prop-xyz/ns1/testPeekEncryptedMessages-" + UUID.randomUUID().toString(); + final String subName = "my-sub"; + + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, subName, MessageId.latest); + + final Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .enableBatching(true) + .addEncryptionKey("my-app-key") + .defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-rsa.pem") + .create(); + + for (int i = 0; i < 5; i++) { + producer.send(("message-" + i).getBytes()); + } + producer.close(); + + final List<Message<byte[]>> peekedMessages = admin.topics().peekMessages(topicName, subName, 5); + assertEquals(peekedMessages.size(), 5); + + final Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topicName) + .subscriptionName(subName) + .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME) + .subscribe(); + final List<Message<byte[]>> receivedMessages = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS); + receivedMessages.add(msg); + consumer.acknowledge(msg); + } + consumer.unsubscribe(); + + for (int i = 0; i < 5; i++) { + assertEquals(peekedMessages.get(i).getMessageId(), receivedMessages.get(i).getMessageId()); + assertEquals(peekedMessages.get(i).getData(), receivedMessages.get(i).getData()); + } } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index ae4de198..d122ded 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1535,8 +1535,17 @@ public class TopicsImpl extends BaseResource implements Topics { } tmp = headers.getFirst(BATCH_HEADER); - if (response.getHeaderString(BATCH_HEADER) != null) { + if (tmp != null) { properties.put(BATCH_HEADER, (String) tmp); + } + + boolean isEncrypted = false; + tmp = headers.getFirst("X-Pulsar-Is-Encrypted"); + if (tmp != null) { + isEncrypted = Boolean.parseBoolean(tmp.toString()); + } + + if (!isEncrypted && response.getHeaderString(BATCH_HEADER) != null) { return getIndividualMsgsFromBatch(topic, msgId, data, properties, messageMetadata); }