This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 27e121f2456a4579985279adb2e98604ac114f95
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Fri Oct 29 17:54:27 2021 -0700

    Websocket should pass the encryption context to the consumers (#12539)
---
 .../websocket/proxy/ProxyPublishConsumeTest.java   | 62 ++++++++++++++++++++++
 .../websocket/proxy/SimpleConsumerSocket.java      |  3 ++
 .../apache/pulsar/websocket/ConsumerHandler.java   |  1 +
 .../pulsar/websocket/data/ConsumerMessage.java     |  3 ++
 4 files changed, 69 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 3e3cf9a..6386c51 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -29,12 +29,14 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -54,9 +56,11 @@ import javax.ws.rs.core.Response;
 
 import lombok.Cleanup;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.MessageCrypto;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.TopicType;
@@ -935,6 +939,64 @@ public class ProxyPublishConsumeTest extends 
ProducerConsumerBase {
         }
     }
 
+    @Test(timeOut = 20000)
+    public void consumeEncryptedMessages() throws Exception {
+        final String subscription = "my-sub";
+        final String topic = "my-property/my-ns/encrypted" + UUID.randomUUID();
+        final String consumerUri = "ws://localhost:" + 
proxyServer.getListenPortHTTP().get() +
+                "/ws/v2/consumer/persistent/" + topic + "/" + subscription + 
"?cryptoFailureAction=CONSUME";
+        final int messages = 10;
+
+        WebSocketClient consumerClient = new WebSocketClient();
+        SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
+
+
+        final String rsaPublicKeyData = 
"data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRu
 [...]
+        final String rsaPrivateKeyData = 
"data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQ
 [...]
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .defaultCryptoKeyReader(rsaPublicKeyData)
+                .addEncryptionKey("ws-consumer-a")
+                .create();
+
+        try {
+            consumerClient.start();
+            ClientUpgradeRequest consumerRequest = new ClientUpgradeRequest();
+            Future<Session> consumerFuture = 
consumerClient.connect(consumeSocket, URI.create(consumerUri), consumerRequest);
+
+            assertTrue(consumerFuture.get().isOpen());
+            assertEquals(consumeSocket.getReceivedMessagesCount(), 0);
+
+            for (int i = 0; i < messages; i++) {
+                
producer.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+            }
+
+            producer.flush();
+            consumeSocket.sendPermits(messages);
+            Awaitility.await().untilAsserted(() ->
+                    
Assert.assertEquals(consumeSocket.getReceivedMessagesCount(), messages));
+
+            for (JsonObject msg : consumeSocket.messages) {
+                assertTrue(msg.has("encryptionContext"));
+                JsonObject encryptionCtx = 
msg.getAsJsonObject("encryptionContext");
+                JsonObject keys = encryptionCtx.getAsJsonObject("keys");
+                assertTrue(keys.has("ws-consumer-a"));
+
+                
assertTrue(keys.getAsJsonObject("ws-consumer-a").has("keyValue"));
+            }
+
+            // The message should not be acked since we only acked 1 message 
of the batch message
+            Awaitility.await().untilAsserted(() ->
+                    
Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions()
+                            .get(subscription).getMsgBacklog(), 0));
+
+        } finally {
+            stopWebSocketClient(consumerClient);
+        }
+    }
+
     private void verifyTopicStat(Client client, String baseUrl, String topic) {
         String statUrl = baseUrl + topic + "/stats";
         WebTarget webTarget = client.target(statUrl);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
index 749bfdcd..b1a9908 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java
@@ -44,6 +44,7 @@ public class SimpleConsumerSocket {
     private final CountDownLatch closeLatch;
     private Session session;
     private final ArrayList<String> consumerBuffer;
+    final ArrayList<JsonObject> messages;
     private final AtomicInteger receivedMessages = new AtomicInteger();
     // Custom message handler to override standard message processing, if it's 
needed
     private SimpleConsumerMessageHandler customMessageHandler;
@@ -51,6 +52,7 @@ public class SimpleConsumerSocket {
     public SimpleConsumerSocket() {
         this.closeLatch = new CountDownLatch(1);
         consumerBuffer = new ArrayList<>();
+        this.messages = new ArrayList<>();
     }
 
     public boolean awaitClose(int duration, TimeUnit unit) throws 
InterruptedException {
@@ -79,6 +81,7 @@ public class SimpleConsumerSocket {
     public synchronized void onMessage(String msg) throws JsonParseException, 
IOException {
         receivedMessages.incrementAndGet();
         JsonObject message = new Gson().fromJson(msg, JsonObject.class);
+        this.messages.add(message);
         if (message.get(X_PULSAR_MESSAGE_ID) != null) {
             String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString();
             consumerBuffer.add(messageId);
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
index 54a5a62..0192188 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java
@@ -160,6 +160,7 @@ public class ConsumerHandler extends 
AbstractWebSocketHandler {
             dm.properties = msg.getProperties();
             dm.publishTime = DateFormatter.format(msg.getPublishTime());
             dm.redeliveryCount = msg.getRedeliveryCount();
+            dm.encryptionContext = msg.getEncryptionCtx().orElse(null);
             if (msg.getEventTime() != 0) {
                 dm.eventTime = DateFormatter.format(msg.getEventTime());
             }
diff --git 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java
 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java
index 9660c95..9091a7e 100644
--- 
a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java
+++ 
b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonInclude.Include;
+import org.apache.pulsar.common.api.EncryptionContext;
 
 @JsonInclude(Include.NON_NULL)
 public class ConsumerMessage {
@@ -32,5 +33,7 @@ public class ConsumerMessage {
     public int redeliveryCount;
     public String eventTime;
 
+    public EncryptionContext encryptionContext;
+
     public String key;
 }

Reply via email to