This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b07a403f0d62039b9c2a2a5a77198e9372dd4291 Author: Zike Yang <[email protected]> AuthorDate: Fri Jun 6 18:36:55 2025 +0800 [fix][client] Fix consumer not returning encrypted messages on decryption failure with compression enabled (#24356) ## Motivation There is a problem when a consumer tries to decrypt a compressed message. If a consumer fails to decrypt a compressed message, it also fails to decompress it, leading to the message being discarded. The user won't receive the message, even if using `CryptoFailureAction.CONSUME`, because the failure occurs during decompression, not decryption. This issue could be easily reproduced by the test added in this PR: `testE2EEncryptionWithCompression `. We already have logic to skip decompression for messages that can't be decrypted: https://github.com/apache/pulsar/blob/397b0211cf1c9069d80daa691ca530e09bc37999/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1439-L1441 The main issue is if the consumer fails to decrypt the message here: https://github.com/apache/pulsar/blob/397b0211cf1c9069d80daa691ca530e09bc37999/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1947, it isn't treated as undecryptable and still tries to decompress. ## Modification - Refactor `decryptPayloadIfNeeded` to return a clearer result, `DecryptResult`, to guide the logic on whether the message was decrypted successfully or should be discarded. - If decryption fails, treat the message as undecryptable and skip decompression. (cherry picked from commit 9c954548b6636a3d5ef14eef05338c881daaa80d) --- .../client/api/SimpleProducerConsumerTest.java | 89 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ConsumerImpl.java | 70 +++++++++++++---- 2 files changed, 146 insertions(+), 13 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 9e35b4f262e..ddb418c37a7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -5062,4 +5062,93 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { .createAsync().get(5, TimeUnit.SECONDS); assertNotNull(reader); } + + /** + * Creates a CryptoKeyReader that always returns keys from specified file paths, + * regardless of the key name provided. + * + * <p>This method creates a CryptoKeyReader instance that reads the public and + * private keys from fixed file paths. The key name and metadata provided to + * the getPublicKey and getPrivateKey methods are ignored, and the keys are + * always read from the specified paths.</p> + * + * @param publicKeyPath the file path to the public key + * @param privateKeyPath the file path to the private key + * @return a CryptoKeyReader that reads keys from the specified file paths + * @throws AssertionError if the key files are not present or not readable + */ + private static CryptoKeyReader createFixedFileCryptoKeyReader(String publicKeyPath, String privateKeyPath) { + return new CryptoKeyReader() { + final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo(); + + @Override + public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) { + if (Files.isReadable(Paths.get(publicKeyPath))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyPath))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + publicKeyPath); + } + } else { + Assert.fail("Certificate file " + publicKeyPath + " is not present or not readable."); + } + return null; + } + + @Override + public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) { + if (Files.isReadable(Paths.get(privateKeyPath))) { + try { + keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyPath))); + return keyInfo; + } catch (IOException e) { + Assert.fail("Failed to read certificate from " + privateKeyPath); + } + } else { + Assert.fail("Certificate file " + privateKeyPath + " is not present or not readable."); + } + return null; + } + }; + } + + @Test + public void testE2EEncryptionWithCompression() throws Exception { + final String topic = "persistent://my-property/my-ns/testE2EEncryptionWithCompression-" + UUID.randomUUID(); + + final var producer = pulsarClient.newProducer(Schema.STRING) + .topic(topic) + .addEncryptionKey("client-rsa.pem") + .cryptoKeyReader(createFixedFileCryptoKeyReader( + "./src/test/resources/certificate/public-key.client-rsa.pem", + "./src/test/resources/certificate/private-key.client-rsa.pem" + )) + //.compressionMinMsgBodySize(1) // enforce compression + .compressionType(CompressionType.LZ4) + .create(); + + final var consumer = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName("test") + .cryptoKeyReader(createFixedFileCryptoKeyReader( + "./src/test/resources/certificate/public-key.client-mismatch-rsa.pem", + "./src/test/resources/certificate/private-key.client-mismatch-rsa.pem" + )) + .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME) + .subscribe(); + + for (int i = 0; i < 10; i++) { + producer.send("message-" + i); + } + + for (int i = 0; i < 10; i++) { + final var msg = consumer.receive(5, TimeUnit.SECONDS); + assertNotNull(msg); + consumer.acknowledge(msg); + } + + producer.close(); + consumer.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index f575a127c96..b4a9ec610e8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1456,16 +1456,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return; } - ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, redeliveryCount, msgMetadata, headersAndPayload, + DecryptResult decryptResult = decryptPayloadIfNeeded(messageId, redeliveryCount, msgMetadata, headersAndPayload, cnx); - boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata); - - if (decryptedPayload == null) { + if (decryptResult.shouldDiscard()) { // Message was discarded or CryptoKeyReader isn't implemented return; } + boolean isMessageUndecryptable = !decryptResult.success; + + ByteBuf decryptedPayload = decryptResult.payload; + // uncompress decryptedPayload and release decryptedPayload-ByteBuf ByteBuf uncompressedPayload = (isMessageUndecryptable || isChunkedMessage) ? decryptedPayload.retain() : uncompressPayloadIfNeeded(messageId, msgMetadata, decryptedPayload, cnx, true); @@ -1951,11 +1953,53 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return connectionHandler.lastConnectionClosedTimestamp; } - private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCount, MessageMetadata msgMetadata, - ByteBuf payload, ClientCnx currentCnx) { + /** + * Represents the outcome of a message decryption attempt for the consumer. + */ + private static class DecryptResult { + private final boolean success; + private final ByteBuf payload; + + private DecryptResult(boolean success, ByteBuf decryptedPayload) { + this.success = success; + this.payload = decryptedPayload; + } + + /** + * Returns true if the message should be discarded and not delivered to the consumer user. + */ + public boolean shouldDiscard() { + return this.payload == null; + } + + /** + * Creates a result indicating decryption succeeded and the payload is ready for use. + */ + public static DecryptResult success(ByteBuf decryptedPayload) { + return new DecryptResult(true, decryptedPayload); + } + + /** + * Creates a result indicating decryption failed, but the message should still be delivered. + */ + public static DecryptResult failure(ByteBuf decryptedPayload) { + return new DecryptResult(false, decryptedPayload); + } + + /** + * Creates a result indicating the message should be discarded. + */ + public static DecryptResult discard() { + return new DecryptResult(false, null); + } + } + + private DecryptResult decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCount, + MessageMetadata msgMetadata, + ByteBuf payload, ClientCnx currentCnx) { if (msgMetadata.getEncryptionKeysCount() == 0) { - return payload.retain(); + return DecryptResult.success(payload.retain()); } int batchSize = msgMetadata.getNumMessagesInBatch(); // If KeyReader is not configured throw exception based on config param @@ -1969,7 +2013,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle ByteBuffer nioDecryptedData = decryptedData.nioBuffer(0, maxDecryptedSize); if (msgCrypto.decrypt(() -> msgMetadata, payload.nioBuffer(), nioDecryptedData, conf.getCryptoKeyReader())) { decryptedData.writerIndex(nioDecryptedData.limit()); - return decryptedData; + return DecryptResult.success(decryptedData); } decryptedData.release(); @@ -1977,7 +2021,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle return handleCryptoFailure(payload, messageId, currentCnx, redeliveryCount, batchSize, false); } - private ByteBuf handleCryptoFailure(ByteBuf payload, MessageIdData messageId, ClientCnx currentCnx, + private DecryptResult handleCryptoFailure(ByteBuf payload, MessageIdData messageId, ClientCnx currentCnx, int redeliveryCount, int batchSize, boolean cryptoReaderNotExist) { switch (conf.getCryptoFailureAction()) { @@ -1990,7 +2034,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle log.warn("[{}][{}][{}][{}] Decryption failed. Consuming encrypted message since config is set to" + " consume.", topic, subscription, consumerName, messageId); } - return payload.retain(); + return DecryptResult.failure(payload.retain()); case DISCARD: if (cryptoReaderNotExist) { log.warn( @@ -2005,7 +2049,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle messageId.getBatchIndex()); } discardMessage(messageId, currentCnx, ValidationError.DecryptionError, batchSize); - return null; + return DecryptResult.discard(); case FAIL: if (cryptoReaderNotExist) { log.error( @@ -2020,11 +2064,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle } MessageId m = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), partitionIndex); unAckedMessageTracker.add(m, redeliveryCount); - return null; + return DecryptResult.discard(); default: log.warn("[{}][{}][{}] Invalid crypto failure state found, continue message consumption.", topic, subscription, consumerName); - return payload.retain(); + return DecryptResult.failure(payload.retain()); } }
