This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b07a403f0d62039b9c2a2a5a77198e9372dd4291
Author: Zike Yang <[email protected]>
AuthorDate: Fri Jun 6 18:36:55 2025 +0800

    [fix][client] Fix consumer not returning encrypted messages on decryption 
failure with compression enabled (#24356)
    
    ## Motivation
    
    There is a problem when a consumer tries to decrypt a compressed message. 
If a consumer fails to decrypt a compressed message, it also fails to 
decompress it, leading to the message being discarded. The user won't receive 
the message, even if using `CryptoFailureAction.CONSUME`, because the failure 
occurs during decompression, not decryption. This issue could be easily 
reproduced by the test added in this PR: `testE2EEncryptionWithCompression `.
    
    We already have logic to skip decompression for messages that can't be 
decrypted: 
https://github.com/apache/pulsar/blob/397b0211cf1c9069d80daa691ca530e09bc37999/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1439-L1441
    
    The main issue is if the consumer fails to decrypt the message here: 
https://github.com/apache/pulsar/blob/397b0211cf1c9069d80daa691ca530e09bc37999/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1947,
 it isn't treated as undecryptable and still tries to decompress.
    
    ## Modification
    
    - Refactor `decryptPayloadIfNeeded` to return a clearer result, 
`DecryptResult`, to guide the logic on whether the message was decrypted 
successfully or should be discarded.
    - If decryption fails, treat the message as undecryptable and skip 
decompression.
    
    (cherry picked from commit 9c954548b6636a3d5ef14eef05338c881daaa80d)
---
 .../client/api/SimpleProducerConsumerTest.java     | 89 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 70 +++++++++++++----
 2 files changed, 146 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 9e35b4f262e..ddb418c37a7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -5062,4 +5062,93 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
                 .createAsync().get(5, TimeUnit.SECONDS);
         assertNotNull(reader);
     }
+
+    /**
+     * Creates a CryptoKeyReader that always returns keys from specified file 
paths,
+     * regardless of the key name provided.
+     *
+     * <p>This method creates a CryptoKeyReader instance that reads the public 
and
+     * private keys from fixed file paths. The key name and metadata provided 
to
+     * the getPublicKey and getPrivateKey methods are ignored, and the keys are
+     * always read from the specified paths.</p>
+     *
+     * @param publicKeyPath  the file path to the public key
+     * @param privateKeyPath the file path to the private key
+     * @return a CryptoKeyReader that reads keys from the specified file paths
+     * @throws AssertionError if the key files are not present or not readable
+     */
+    private static CryptoKeyReader createFixedFileCryptoKeyReader(String 
publicKeyPath, String privateKeyPath) {
+        return new CryptoKeyReader() {
+            final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                if (Files.isReadable(Paths.get(publicKeyPath))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyPath)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
publicKeyPath);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + publicKeyPath + " is not 
present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                if (Files.isReadable(Paths.get(privateKeyPath))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyPath)));
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
privateKeyPath);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + privateKeyPath + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        };
+    }
+
+    @Test
+    public void testE2EEncryptionWithCompression() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/testE2EEncryptionWithCompression-" + 
UUID.randomUUID();
+
+        final var producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .addEncryptionKey("client-rsa.pem")
+                .cryptoKeyReader(createFixedFileCryptoKeyReader(
+                        
"./src/test/resources/certificate/public-key.client-rsa.pem",
+                        
"./src/test/resources/certificate/private-key.client-rsa.pem"
+                ))
+                //.compressionMinMsgBodySize(1) // enforce compression
+                .compressionType(CompressionType.LZ4)
+                .create();
+
+        final var consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test")
+                .cryptoKeyReader(createFixedFileCryptoKeyReader(
+                        
"./src/test/resources/certificate/public-key.client-mismatch-rsa.pem",
+                        
"./src/test/resources/certificate/private-key.client-mismatch-rsa.pem"
+                ))
+                .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+                .subscribe();
+
+        for (int i = 0; i < 10; i++) {
+            producer.send("message-" + i);
+        }
+
+        for (int i = 0; i < 10; i++) {
+            final var msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertNotNull(msg);
+            consumer.acknowledge(msg);
+        }
+
+        producer.close();
+        consumer.close();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index f575a127c96..b4a9ec610e8 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1456,16 +1456,18 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             return;
         }
 
-        ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, 
redeliveryCount, msgMetadata, headersAndPayload,
+        DecryptResult decryptResult = decryptPayloadIfNeeded(messageId, 
redeliveryCount, msgMetadata, headersAndPayload,
                 cnx);
 
-        boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);
-
-        if (decryptedPayload == null) {
+        if (decryptResult.shouldDiscard()) {
             // Message was discarded or CryptoKeyReader isn't implemented
             return;
         }
 
+        boolean isMessageUndecryptable = !decryptResult.success;
+
+        ByteBuf decryptedPayload = decryptResult.payload;
+
         // uncompress decryptedPayload and release decryptedPayload-ByteBuf
         ByteBuf uncompressedPayload = (isMessageUndecryptable || 
isChunkedMessage) ? decryptedPayload.retain()
                 : uncompressPayloadIfNeeded(messageId, msgMetadata, 
decryptedPayload, cnx, true);
@@ -1951,11 +1953,53 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return connectionHandler.lastConnectionClosedTimestamp;
     }
 
-    private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, int 
redeliveryCount, MessageMetadata msgMetadata,
-                                           ByteBuf payload, ClientCnx 
currentCnx) {
+    /**
+     * Represents the outcome of a message decryption attempt for the consumer.
+     */
+    private static class DecryptResult {
+        private final boolean success;
+        private final ByteBuf payload;
+
+        private DecryptResult(boolean success, ByteBuf decryptedPayload) {
+            this.success = success;
+            this.payload = decryptedPayload;
+        }
+
+        /**
+         * Returns true if the message should be discarded and not delivered 
to the consumer user.
+         */
+        public boolean shouldDiscard() {
+            return this.payload == null;
+        }
+
+        /**
+         * Creates a result indicating decryption succeeded and the payload is 
ready for use.
+         */
+        public static DecryptResult success(ByteBuf decryptedPayload) {
+            return new DecryptResult(true, decryptedPayload);
+        }
+
+        /**
+         * Creates a result indicating decryption failed, but the message 
should still be delivered.
+         */
+        public static DecryptResult failure(ByteBuf decryptedPayload) {
+            return new DecryptResult(false, decryptedPayload);
+        }
+
+        /**
+         * Creates a result indicating the message should be discarded.
+         */
+        public static DecryptResult discard() {
+            return new DecryptResult(false, null);
+        }
+    }
+
+    private DecryptResult decryptPayloadIfNeeded(MessageIdData messageId, int 
redeliveryCount,
+                                                 MessageMetadata msgMetadata,
+                                                 ByteBuf payload, ClientCnx 
currentCnx) {
 
         if (msgMetadata.getEncryptionKeysCount() == 0) {
-            return payload.retain();
+            return DecryptResult.success(payload.retain());
         }
         int batchSize = msgMetadata.getNumMessagesInBatch();
         // If KeyReader is not configured throw exception based on config param
@@ -1969,7 +2013,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         ByteBuffer nioDecryptedData = decryptedData.nioBuffer(0, 
maxDecryptedSize);
         if (msgCrypto.decrypt(() -> msgMetadata, payload.nioBuffer(), 
nioDecryptedData, conf.getCryptoKeyReader())) {
             decryptedData.writerIndex(nioDecryptedData.limit());
-            return decryptedData;
+            return DecryptResult.success(decryptedData);
         }
 
         decryptedData.release();
@@ -1977,7 +2021,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return handleCryptoFailure(payload, messageId, currentCnx, 
redeliveryCount, batchSize, false);
     }
 
-    private ByteBuf handleCryptoFailure(ByteBuf payload, MessageIdData 
messageId, ClientCnx currentCnx,
+    private DecryptResult handleCryptoFailure(ByteBuf payload, MessageIdData 
messageId, ClientCnx currentCnx,
             int redeliveryCount, int batchSize, boolean cryptoReaderNotExist) {
 
         switch (conf.getCryptoFailureAction()) {
@@ -1990,7 +2034,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 log.warn("[{}][{}][{}][{}] Decryption failed. Consuming 
encrypted message since config is set to"
                         + " consume.", topic, subscription, consumerName, 
messageId);
             }
-            return payload.retain();
+            return DecryptResult.failure(payload.retain());
         case DISCARD:
             if (cryptoReaderNotExist) {
                 log.warn(
@@ -2005,7 +2049,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                         messageId.getBatchIndex());
             }
             discardMessage(messageId, currentCnx, 
ValidationError.DecryptionError, batchSize);
-            return null;
+            return DecryptResult.discard();
         case FAIL:
             if (cryptoReaderNotExist) {
                 log.error(
@@ -2020,11 +2064,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
             }
             MessageId m = new MessageIdImpl(messageId.getLedgerId(), 
messageId.getEntryId(), partitionIndex);
             unAckedMessageTracker.add(m, redeliveryCount);
-            return null;
+            return DecryptResult.discard();
         default:
             log.warn("[{}][{}][{}] Invalid crypto failure state found, 
continue message consumption.", topic,
                     subscription, consumerName);
-            return payload.retain();
+            return DecryptResult.failure(payload.retain());
         }
     }
 

Reply via email to