This is an automated email from the ASF dual-hosted git repository.

jai1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b3457e  Redeliver messages that can't be decrypted. (#3097)
3b3457e is described below

commit 3b3457e2253b9201c1fdd6826b52df12b17ef217
Author: Jai Asher <[email protected]>
AuthorDate: Fri Nov 30 01:33:19 2018 -0800

    Redeliver messages that can't be decrypted. (#3097)
---
 .../client/api/SimpleProducerConsumerTest.java     | 133 +++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  66 +++++-----
 2 files changed, 170 insertions(+), 29 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index f6b8e1a..fb47c50 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -34,6 +35,7 @@ import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -2305,6 +2307,137 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         consumer.close();
         log.info("-- Exiting {} test --", methodName);
     }
+    
+    @Test(groups = "encryption")
+    public void testRedeliveryOfFailedMessages() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+        
+        class InvalidKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> metadata) {
+                return null;
+            }
+        }
+        
+        /*
+         * Redelivery functionality guarantees that customer will get a chance 
to process the message again.
+         * In case of shared subscription eventually every client will get a 
chance to process the message, till one of them acks it.
+         * 
+         * For client with Encryption enabled where in cases like a new 
production rollout or a buggy client configuration, we might have a mismatch of 
consumers 
+         * - few which can decrypt, few which can't (due to errors or 
cryptoReader not configured).
+         * 
+         * In that case eventually all messages should be acked as long as 
there is a single consumer who can decrypt the message.
+         * 
+         * Consumer 1 - Can decrypt message
+         * Consumer 2 - Has invalid Reader configured.
+         * Consumer 3 - Has no reader configured.
+         * 
+         */
+
+        String topicName = "persistent://my-property/my-ns/myrsa-topic1";
+        
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+                
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
EncKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+        
+        Consumer<byte[]> consumer2 = 
pulsarClient.newConsumer().topicsPattern(topicName)
+                .subscriptionName("my-subscriber-name").cryptoKeyReader(new 
InvalidKeyReader())
+                .subscriptionType(SubscriptionType.Shared).ackTimeout(1, 
TimeUnit.SECONDS).subscribe();
+
+        Consumer<byte[]> consumer3 = 
pulsarClient.newConsumer().topicsPattern(topicName)
+                
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1,
 TimeUnit.SECONDS).subscribe();
+        
+        int numberOfMessages = 100;
+        String message = "my-message";
+        Set<String> messages = new HashSet(); // Since messages are in random 
order
+        for (int i = 0; i<numberOfMessages; i++) {
+            producer.send((message + i).getBytes());
+        }
+        
+        // Consuming from consumer 2 and 3 
+        // no message should be returned since they can't decrypt the message
+        Message m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        
+        for (int i = 0; i<numberOfMessages; i++) {
+            // All messages would be received by consumer 1 
+            m = consumer1.receive();
+            messages.add(new String(m.getData()));
+            consumer1.acknowledge(m);
+        }
+        
+        // Consuming from consumer 2 and 3 again just to be sure 
+        // no message should be returned since they can't decrypt the message
+        m = consumer2.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        m = consumer3.receive(3, TimeUnit.SECONDS);
+        assertNull(m);
+        
+        // checking if all messages were received
+        for (int i = 0; i<numberOfMessages; i++) {
+            assertTrue(messages.contains((message + i)));
+        }
+        
+        consumer1.close();
+        consumer2.close();
+        consumer3.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
 
     @Test(groups = "encryption")
     public void testEncryptionFailure() throws Exception {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 76c4f53..6ae925d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1107,44 +1107,52 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
         // If KeyReader is not configured throw exception based on config param
         if (conf.getCryptoKeyReader() == null) {
-
-            if (conf.getCryptoFailureAction() == 
ConsumerCryptoFailureAction.CONSUME) {
-                log.warn("[{}][{}][{}] CryptoKeyReader interface is not 
implemented. Consuming encrypted message.",
-                        topic, subscription, consumerName);
-                return payload.retain();
-            } else if (conf.getCryptoFailureAction() == 
ConsumerCryptoFailureAction.DISCARD) {
-                log.warn(
-                        "[{}][{}][{}] Skipping decryption since 
CryptoKeyReader interface is not implemented and config is set to discard",
-                        topic, subscription, consumerName);
-                discardMessage(messageId, currentCnx, 
ValidationError.DecryptionError);
-            } else {
-                log.error(
-                        "[{}][{}][{}] Message delivery failed since 
CryptoKeyReader interface is not implemented to consume encrypted message",
-                        topic, subscription, consumerName);
+            switch (conf.getCryptoFailureAction()) {
+                case CONSUME:
+                    log.warn("[{}][{}][{}] CryptoKeyReader interface is not 
implemented. Consuming encrypted message.",
+                            topic, subscription, consumerName);
+                    return payload.retain();
+                case DISCARD:
+                    log.warn(
+                            "[{}][{}][{}] Skipping decryption since 
CryptoKeyReader interface is not implemented and config is set to discard",
+                            topic, subscription, consumerName);
+                    discardMessage(messageId, currentCnx, 
ValidationError.DecryptionError);
+                    return null;
+                case FAIL:
+                    MessageId m = new MessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(), partitionIndex);
+                    log.error(
+                            "[{}][{}][{}][{}] Message delivery failed since 
CryptoKeyReader interface is not implemented to consume encrypted message",
+                             topic, subscription, consumerName, m);
+                    unAckedMessageTracker.add(m);
+                    return null;
             }
-            return null;
         }
 
         ByteBuf decryptedData = this.msgCrypto.decrypt(msgMetadata, payload, 
conf.getCryptoKeyReader());
         if (decryptedData != null) {
             return decryptedData;
         }
-
-        if (conf.getCryptoFailureAction() == 
ConsumerCryptoFailureAction.CONSUME) {
-            // Note, batch message will fail to consume even if config is set 
to consume
-            log.warn("[{}][{}][{}][{}] Decryption failed. Consuming encrypted 
message since config is set to consume.",
-                    topic, subscription, consumerName, messageId);
-            return payload.retain();
-        } else if (conf.getCryptoFailureAction() == 
ConsumerCryptoFailureAction.DISCARD) {
-            log.warn("[{}][{}][{}][{}] Discarding message since decryption 
failed and config is set to discard", topic,
-                    subscription, consumerName, messageId);
-            discardMessage(messageId, currentCnx, 
ValidationError.DecryptionError);
-        } else {
-            log.error("[{}][{}][{}][{}] Message delivery failed since unable 
to decrypt incoming message", topic,
-                    subscription, consumerName, messageId);
+        
+        switch (conf.getCryptoFailureAction()) {
+            case CONSUME:
+                // Note, batch message will fail to consume even if config is 
set to consume
+                log.warn("[{}][{}][{}][{}] Decryption failed. Consuming 
encrypted message since config is set to consume.",
+                        topic, subscription, consumerName, messageId);
+                return payload.retain();
+            case DISCARD:
+                log.warn("[{}][{}][{}][{}] Discarding message since decryption 
failed and config is set to discard", topic,
+                        subscription, consumerName, messageId);
+                discardMessage(messageId, currentCnx, 
ValidationError.DecryptionError);
+                return null;
+            case FAIL:
+                MessageId m = new MessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(), partitionIndex);
+                log.error(
+                        "[{}][{}][{}][{}] Message delivery failed since unable 
to decrypt incoming message",
+                         topic, subscription, consumerName, m);
+                unAckedMessageTracker.add(m);
+                return null;
         }
         return null;
-
     }
 
     private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, 
MessageMetadata msgMetadata, ByteBuf payload,

Reply via email to