This is an automated email from the ASF dual-hosted git repository.
jai1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3b3457e Redeliver messages that can't be decrypted. (#3097)
3b3457e is described below
commit 3b3457e2253b9201c1fdd6826b52df12b17ef217
Author: Jai Asher <[email protected]>
AuthorDate: Fri Nov 30 01:33:19 2018 -0800
Redeliver messages that can't be decrypted. (#3097)
---
.../client/api/SimpleProducerConsumerTest.java | 133 +++++++++++++++++++++
.../apache/pulsar/client/impl/ConsumerImpl.java | 66 +++++-----
2 files changed, 170 insertions(+), 29 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index f6b8e1a..fb47c50 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -34,6 +35,7 @@ import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -2305,6 +2307,137 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
+
+ @Test(groups = "encryption")
+ public void testRedeliveryOfFailedMessages() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final String encryptionKeyName = "client-rsa.pem";
+ final String encryptionKeyVersion = "1.0";
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put("version", encryptionKeyVersion);
+ class EncKeyReader implements CryptoKeyReader {
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map<String,
String> keyMeta) {
+ String CERT_FILE_PATH =
"./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " +
CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is
not present or not readable.");
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMeta) {
+ String CERT_FILE_PATH =
"./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " +
CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is
not present or not readable.");
+ }
+ return null;
+ }
+ }
+
+ class InvalidKeyReader implements CryptoKeyReader {
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map<String,
String> keyMeta) {
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> metadata) {
+ return null;
+ }
+ }
+
+ /*
+ * Redelivery functionality guarantees that customer will get a chance
to process the message again.
+ * In case of shared subscription eventually every client will get a
chance to process the message, till one of them acks it.
+ *
+ * For client with Encryption enabled where in cases like a new
production rollout or a buggy client configuration, we might have a mismatch of
consumers
+ * - few which can decrypt, few which can't (due to errors or
cryptoReader not configured).
+ *
+ * In that case eventually all messages should be acked as long as
there is a single consumer who can decrypt the message.
+ *
+ * Consumer 1 - Can decrypt message
+ * Consumer 2 - Has invalid Reader configured.
+ * Consumer 3 - Has no reader configured.
+ *
+ */
+
+ String topicName = "persistent://my-property/my-ns/myrsa-topic1";
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
+
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+ .cryptoKeyReader(new EncKeyReader()).create();
+
+ Consumer<byte[]> consumer1 =
pulsarClient.newConsumer().topicsPattern(topicName)
+ .subscriptionName("my-subscriber-name").cryptoKeyReader(new
EncKeyReader())
+ .subscriptionType(SubscriptionType.Shared).ackTimeout(1,
TimeUnit.SECONDS).subscribe();
+
+ Consumer<byte[]> consumer2 =
pulsarClient.newConsumer().topicsPattern(topicName)
+ .subscriptionName("my-subscriber-name").cryptoKeyReader(new
InvalidKeyReader())
+ .subscriptionType(SubscriptionType.Shared).ackTimeout(1,
TimeUnit.SECONDS).subscribe();
+
+ Consumer<byte[]> consumer3 =
pulsarClient.newConsumer().topicsPattern(topicName)
+
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared).ackTimeout(1,
TimeUnit.SECONDS).subscribe();
+
+ int numberOfMessages = 100;
+ String message = "my-message";
+ Set<String> messages = new HashSet(); // Since messages are in random
order
+ for (int i = 0; i<numberOfMessages; i++) {
+ producer.send((message + i).getBytes());
+ }
+
+ // Consuming from consumer 2 and 3
+ // no message should be returned since they can't decrypt the message
+ Message m = consumer2.receive(3, TimeUnit.SECONDS);
+ assertNull(m);
+ m = consumer3.receive(3, TimeUnit.SECONDS);
+ assertNull(m);
+
+ for (int i = 0; i<numberOfMessages; i++) {
+ // All messages would be received by consumer 1
+ m = consumer1.receive();
+ messages.add(new String(m.getData()));
+ consumer1.acknowledge(m);
+ }
+
+ // Consuming from consumer 2 and 3 again just to be sure
+ // no message should be returned since they can't decrypt the message
+ m = consumer2.receive(3, TimeUnit.SECONDS);
+ assertNull(m);
+ m = consumer3.receive(3, TimeUnit.SECONDS);
+ assertNull(m);
+
+ // checking if all messages were received
+ for (int i = 0; i<numberOfMessages; i++) {
+ assertTrue(messages.contains((message + i)));
+ }
+
+ consumer1.close();
+ consumer2.close();
+ consumer3.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
@Test(groups = "encryption")
public void testEncryptionFailure() throws Exception {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 76c4f53..6ae925d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1107,44 +1107,52 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
// If KeyReader is not configured throw exception based on config param
if (conf.getCryptoKeyReader() == null) {
-
- if (conf.getCryptoFailureAction() ==
ConsumerCryptoFailureAction.CONSUME) {
- log.warn("[{}][{}][{}] CryptoKeyReader interface is not
implemented. Consuming encrypted message.",
- topic, subscription, consumerName);
- return payload.retain();
- } else if (conf.getCryptoFailureAction() ==
ConsumerCryptoFailureAction.DISCARD) {
- log.warn(
- "[{}][{}][{}] Skipping decryption since
CryptoKeyReader interface is not implemented and config is set to discard",
- topic, subscription, consumerName);
- discardMessage(messageId, currentCnx,
ValidationError.DecryptionError);
- } else {
- log.error(
- "[{}][{}][{}] Message delivery failed since
CryptoKeyReader interface is not implemented to consume encrypted message",
- topic, subscription, consumerName);
+ switch (conf.getCryptoFailureAction()) {
+ case CONSUME:
+ log.warn("[{}][{}][{}] CryptoKeyReader interface is not
implemented. Consuming encrypted message.",
+ topic, subscription, consumerName);
+ return payload.retain();
+ case DISCARD:
+ log.warn(
+ "[{}][{}][{}] Skipping decryption since
CryptoKeyReader interface is not implemented and config is set to discard",
+ topic, subscription, consumerName);
+ discardMessage(messageId, currentCnx,
ValidationError.DecryptionError);
+ return null;
+ case FAIL:
+ MessageId m = new MessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), partitionIndex);
+ log.error(
+ "[{}][{}][{}][{}] Message delivery failed since
CryptoKeyReader interface is not implemented to consume encrypted message",
+ topic, subscription, consumerName, m);
+ unAckedMessageTracker.add(m);
+ return null;
}
- return null;
}
ByteBuf decryptedData = this.msgCrypto.decrypt(msgMetadata, payload,
conf.getCryptoKeyReader());
if (decryptedData != null) {
return decryptedData;
}
-
- if (conf.getCryptoFailureAction() ==
ConsumerCryptoFailureAction.CONSUME) {
- // Note, batch message will fail to consume even if config is set
to consume
- log.warn("[{}][{}][{}][{}] Decryption failed. Consuming encrypted
message since config is set to consume.",
- topic, subscription, consumerName, messageId);
- return payload.retain();
- } else if (conf.getCryptoFailureAction() ==
ConsumerCryptoFailureAction.DISCARD) {
- log.warn("[{}][{}][{}][{}] Discarding message since decryption
failed and config is set to discard", topic,
- subscription, consumerName, messageId);
- discardMessage(messageId, currentCnx,
ValidationError.DecryptionError);
- } else {
- log.error("[{}][{}][{}][{}] Message delivery failed since unable
to decrypt incoming message", topic,
- subscription, consumerName, messageId);
+
+ switch (conf.getCryptoFailureAction()) {
+ case CONSUME:
+ // Note, batch message will fail to consume even if config is
set to consume
+ log.warn("[{}][{}][{}][{}] Decryption failed. Consuming
encrypted message since config is set to consume.",
+ topic, subscription, consumerName, messageId);
+ return payload.retain();
+ case DISCARD:
+ log.warn("[{}][{}][{}][{}] Discarding message since decryption
failed and config is set to discard", topic,
+ subscription, consumerName, messageId);
+ discardMessage(messageId, currentCnx,
ValidationError.DecryptionError);
+ return null;
+ case FAIL:
+ MessageId m = new MessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), partitionIndex);
+ log.error(
+ "[{}][{}][{}][{}] Message delivery failed since unable
to decrypt incoming message",
+ topic, subscription, consumerName, m);
+ unAckedMessageTracker.add(m);
+ return null;
}
return null;
-
}
private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId,
MessageMetadata msgMetadata, ByteBuf payload,