This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 27e121f2456a4579985279adb2e98604ac114f95 Author: Matteo Merli <mme...@apache.org> AuthorDate: Fri Oct 29 17:54:27 2021 -0700 Websocket should pass the encryption context to the consumers (#12539) --- .../websocket/proxy/ProxyPublishConsumeTest.java | 62 ++++++++++++++++++++++ .../websocket/proxy/SimpleConsumerSocket.java | 3 ++ .../apache/pulsar/websocket/ConsumerHandler.java | 1 + .../pulsar/websocket/data/ConsumerMessage.java | 3 ++ 4 files changed, 69 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 3e3cf9a..6386c51 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -29,12 +29,14 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -54,9 +56,11 @@ import javax.ws.rs.core.Response; import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.MessageCrypto; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerAccessMode; import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.impl.crypto.MessageCryptoBc; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.TopicType; @@ -935,6 +939,64 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase { } } + @Test(timeOut = 20000) + public void consumeEncryptedMessages() throws Exception { + final String subscription = "my-sub"; + final String topic = "my-property/my-ns/encrypted" + UUID.randomUUID(); + final String consumerUri = "ws://localhost:" + proxyServer.getListenPortHTTP().get() + + "/ws/v2/consumer/persistent/" + topic + "/" + subscription + "?cryptoFailureAction=CONSUME"; + final int messages = 10; + + WebSocketClient consumerClient = new WebSocketClient(); + SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket(); + + + final String rsaPublicKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRu [...] + final String rsaPrivateKeyData = "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQ [...] + + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .defaultCryptoKeyReader(rsaPublicKeyData) + .addEncryptionKey("ws-consumer-a") + .create(); + + try { + consumerClient.start(); + ClientUpgradeRequest consumerRequest = new ClientUpgradeRequest(); + Future<Session> consumerFuture = consumerClient.connect(consumeSocket, URI.create(consumerUri), consumerRequest); + + assertTrue(consumerFuture.get().isOpen()); + assertEquals(consumeSocket.getReceivedMessagesCount(), 0); + + for (int i = 0; i < messages; i++) { + producer.sendAsync(String.valueOf(i).getBytes(StandardCharsets.UTF_8)); + } + + producer.flush(); + consumeSocket.sendPermits(messages); + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(consumeSocket.getReceivedMessagesCount(), messages)); + + for (JsonObject msg : consumeSocket.messages) { + assertTrue(msg.has("encryptionContext")); + JsonObject encryptionCtx = msg.getAsJsonObject("encryptionContext"); + JsonObject keys = encryptionCtx.getAsJsonObject("keys"); + assertTrue(keys.has("ws-consumer-a")); + + assertTrue(keys.getAsJsonObject("ws-consumer-a").has("keyValue")); + } + + // The message should not be acked since we only acked 1 message of the batch message + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions() + .get(subscription).getMsgBacklog(), 0)); + + } finally { + stopWebSocketClient(consumerClient); + } + } + private void verifyTopicStat(Client client, String baseUrl, String topic) { String statUrl = baseUrl + topic + "/stats"; WebTarget webTarget = client.target(statUrl); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java index 749bfdcd..b1a9908 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java @@ -44,6 +44,7 @@ public class SimpleConsumerSocket { private final CountDownLatch closeLatch; private Session session; private final ArrayList<String> consumerBuffer; + final ArrayList<JsonObject> messages; private final AtomicInteger receivedMessages = new AtomicInteger(); // Custom message handler to override standard message processing, if it's needed private SimpleConsumerMessageHandler customMessageHandler; @@ -51,6 +52,7 @@ public class SimpleConsumerSocket { public SimpleConsumerSocket() { this.closeLatch = new CountDownLatch(1); consumerBuffer = new ArrayList<>(); + this.messages = new ArrayList<>(); } public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedException { @@ -79,6 +81,7 @@ public class SimpleConsumerSocket { public synchronized void onMessage(String msg) throws JsonParseException, IOException { receivedMessages.incrementAndGet(); JsonObject message = new Gson().fromJson(msg, JsonObject.class); + this.messages.add(message); if (message.get(X_PULSAR_MESSAGE_ID) != null) { String messageId = message.get(X_PULSAR_MESSAGE_ID).getAsString(); consumerBuffer.add(messageId); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index 54a5a62..0192188 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -160,6 +160,7 @@ public class ConsumerHandler extends AbstractWebSocketHandler { dm.properties = msg.getProperties(); dm.publishTime = DateFormatter.format(msg.getPublishTime()); dm.redeliveryCount = msg.getRedeliveryCount(); + dm.encryptionContext = msg.getEncryptionCtx().orElse(null); if (msg.getEventTime() != 0) { dm.eventTime = DateFormatter.format(msg.getEventTime()); } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java index 9660c95..9091a7e 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerMessage.java @@ -22,6 +22,7 @@ import java.util.Map; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; +import org.apache.pulsar.common.api.EncryptionContext; @JsonInclude(Include.NON_NULL) public class ConsumerMessage { @@ -32,5 +33,7 @@ public class ConsumerMessage { public int redeliveryCount; public String eventTime; + public EncryptionContext encryptionContext; + public String key; }