Copilot commented on code in PR #526:
URL: https://github.com/apache/pulsar-client-cpp/pull/526#discussion_r2591959599


##########
lib/ConsumerImpl.h:
##########
@@ -195,8 +195,15 @@ class ConsumerImpl : public ConsumerImplBase {
     bool isPriorEntryIndex(int64_t idx);
     void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, const 
BrokerConsumerStatsCallback&);
 
-    bool decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const 
proto::CommandMessage& msg,
-                                const proto::MessageMetadata& metadata, 
SharedBuffer& payload);
+    enum DecryptionResult
+    {
+        SUCCESS,
+        CONSUME_ENCRYPTED,
+        FAILED
+    };

Review Comment:
   [nitpick] The enum `DecryptionResult` should follow the existing naming 
convention in the codebase. Looking at other enums (e.g., `SeekStatus` at line 
80, `ConsumerTopicType` at line 70), consider using an enum class for better 
type safety: `enum class DecryptionResult` instead of a plain enum.



##########
lib/ConsumerImpl.cc:
##########
@@ -549,24 +550,27 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
                                    proto::MessageMetadata& metadata, 
SharedBuffer& payload) {
     LOG_DEBUG(getName() << "Received Message -- Size: " << 
payload.readableBytes());
 
-    if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) {
-        // Message was discarded or not consumed due to decryption failure
-        return;
-    }
-
     if (!isChecksumValid) {
         // Message discarded for checksum error
         discardCorruptedMessage(cnx, msg.message_id(), 
CommandAck_ValidationError_ChecksumMismatch);
         return;
     }
 
-    auto redeliveryCount = msg.redelivery_count();
-    const bool isMessageUndecryptable =
-        metadata.encryption_keys_size() > 0 && 
!config_.getCryptoKeyReader().get() &&
-        config_.getCryptoFailureAction() == 
ConsumerCryptoFailureAction::CONSUME;
+    auto encryptionContext = metadata.encryption_keys_size() > 0
+                                 ? optional<EncryptionContext>(std::in_place, 
metadata, false)
+                                 : std::nullopt;
+    const auto decryptionResult = decryptMessageIfNeeded(cnx, msg, 
encryptionContext, payload);
+    if (decryptionResult == FAILED) {
+        // Message was discarded or not consumed due to decryption failure
+        return;
+    } else if (decryptionResult == CONSUME_ENCRYPTED && 
encryptionContext.has_value()) {
+        // Message is encrypted, but we let the application consume it as-is
+        encryptionContext->isDecryptionFailed_ = true;

Review Comment:
   [nitpick] Directly modifying the private member `isDecryptionFailed_` via 
friend access breaks encapsulation. Consider adding a setter method 
`setDecryptionFailed(bool value)` in the `EncryptionContext` class to make the 
design more maintainable and explicit about this intended mutation.
   ```suggestion
           encryptionContext->setDecryptionFailed(true);
   ```



##########
include/pulsar/EncryptionContext.h:
##########
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <cstdint>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "CompressionType.h"
+#include "defines.h"
+
+namespace pulsar {
+
+namespace proto {
+class MessageMetadata;
+}
+
+class Message;
+
+struct PULSAR_PUBLIC EncryptionKey {
+    std::string key;
+    std::string value;
+    std::unordered_map<std::string, std::string> metadata;
+
+    explicit EncryptionKey() = default;

Review Comment:
   [nitpick] The `explicit` keyword on a default constructor is unnecessary and 
has no effect. Consider removing it: `EncryptionKey() = default;` or simply 
remove the line entirely since the default constructor is implicit when other 
constructors are defined.
   ```suggestion
       EncryptionKey() = default;
   ```



##########
include/pulsar/Message.h:
##########
@@ -202,6 +204,12 @@ class PULSAR_PUBLIC Message {
      */
     const std::string& getProducerName() const noexcept;
 
+    /**
+     * @return the optional encryption context that is present when the 
message is encrypted, the pointer is
+     * valid as the Message instance is alive

Review Comment:
   The documentation says "the pointer is valid as the Message instance is 
alive", but the phrasing is awkward. Consider: "the pointer is valid as long as 
the Message instance is alive" or "the pointer remains valid for the lifetime 
of the Message instance".
   ```suggestion
        * @return the optional encryption context that is present when the 
message is encrypted. The pointer
        * remains valid for the lifetime of the Message instance.
   ```



##########
include/pulsar/EncryptionContext.h:
##########
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <cstdint>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "CompressionType.h"
+#include "defines.h"
+
+namespace pulsar {
+
+namespace proto {
+class MessageMetadata;
+}
+
+class Message;
+
+struct PULSAR_PUBLIC EncryptionKey {
+    std::string key;
+    std::string value;
+    std::unordered_map<std::string, std::string> metadata;
+
+    explicit EncryptionKey() = default;
+
+    EncryptionKey(const std::string& key, const std::string& value,
+                  const decltype(EncryptionKey::metadata)& metadata)
+        : key(key), value(value), metadata(metadata) {}
+};
+
+/**
+ * It contains encryption and compression information in it using which 
application can decrypt consumed
+ * message with encrypted-payload.
+ */
+class PULSAR_PUBLIC EncryptionContext {
+   public:
+    using KeysType = std::vector<EncryptionKey>;
+
+    /**
+     * @return the map of encryption keys used for the message
+     */
+    const KeysType& keys() const noexcept { return keys_; }
+
+    /**
+     * @return the encryption parameter used for the message
+     */
+    const std::string& param() const noexcept { return param_; }
+
+    /**
+     * @return the encryption algorithm used for the message
+     */
+    const std::string& algorithm() const noexcept { return algorithm_; }
+
+    /**
+     * @return the compression type used for the message
+     */
+    CompressionType compressionType() const noexcept { return 
compressionType_; }
+
+    /**
+     * @return the uncompressed message size if the message is compressed, 0 
otherwise
+     */
+    uint32_t uncompressedMessageSize() const noexcept { return 
uncompressedMessageSize_; }
+
+    /**
+     * @return the batch size if the message is part of a batch, -1 otherwise
+     */
+    int32_t batchSize() const noexcept { return batchSize_; }
+
+    /**
+     * When the `ConsumerConfiguration#getCryptoFailureAction` is set to 
`CONSUME`, the message will still be
+     * returned even if the decryption failed, in this case, the message 
payload is still not decrypted but
+     * users have no way to know that. This method is provided to let users 
know whether the decryption
+     * failed.
+     *
+     * @return whether the decryption failed
+     */
+    bool isDecryptionFailed() const noexcept { return isDecryptionFailed_; }
+
+    /**
+     * It should be used only internally but it's exposed so that 
`std::make_optional` can construct the
+     * object in place with this constructor.

Review Comment:
   The comment "It should be used only internally but it's exposed so that 
`std::make_optional` can construct the object in place with this constructor" 
is misleading. The constructor is actually used with `std::in_place` (not 
`std::make_optional`) as seen in ConsumerImpl.cc:560. Consider updating to: 
"This constructor is public to allow in-place construction via std::optional 
(e.g., `std::optional<EncryptionContext>(std::in_place, metadata, false)`), but 
should not be used directly in application code."
   ```suggestion
        * This constructor is public to allow in-place construction via 
std::optional
        * (e.g., `std::optional<EncryptionContext>(std::in_place, metadata, 
false)`),
        * but should not be used directly in application code.
   ```



##########
tests/EncryptionTest.cc:
##########
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+#include <pulsar/ConsumerCryptoFailureAction.h>
+#include <pulsar/MessageBatch.h>
+
+#include <optional>
+#include <stdexcept>
+
+#include "lib/CompressionCodec.h"
+#include "lib/MessageCrypto.h"
+#include "lib/SharedBuffer.h"
+
+static std::string lookupUrl = "pulsar://localhost:6650";
+
+using namespace pulsar;
+
+static CryptoKeyReaderPtr getDefaultCryptoKeyReader() {
+    return std::make_shared<DefaultCryptoKeyReader>(TEST_CONF_DIR 
"/public-key.client-rsa.pem",
+                                                    TEST_CONF_DIR 
"/private-key.client-rsa.pem");
+}
+
+static std::vector<std::string> decryptValue(const char* data, size_t length,
+                                             std::optional<const 
EncryptionContext*> context) {
+    if (!context.has_value()) {
+        return {std::string(data, length)};
+    }
+    if (!context.value()->isDecryptionFailed()) {
+        return {std::string(data, length)};
+    }
+
+    MessageCrypto crypto{"test", false};
+    SharedBuffer decryptedPayload;
+    auto originalPayload = SharedBuffer::copy(data, length);
+    if (!crypto.decrypt(*context.value(), originalPayload, 
getDefaultCryptoKeyReader(), decryptedPayload)) {
+        throw std::runtime_error("Decryption failed");
+    }
+
+    SharedBuffer uncompressedPayload;
+    if (!CompressionCodecProvider::getCodec(context.value()->compressionType())
+             .decode(decryptedPayload, 
context.value()->uncompressedMessageSize(), uncompressedPayload)) {
+        throw std::runtime_error("Decompression failed");
+    }
+
+    std::vector<std::string> values;
+    if (auto batchSize = context.value()->batchSize(); batchSize > 0) {
+        MessageBatch batch;
+        for (auto&& msg : batch.parseFrom(uncompressedPayload, 
batchSize).messages()) {
+            values.emplace_back(msg.getDataAsString());
+        }
+    } else {
+        // non-batched message
+        values.emplace_back(uncompressedPayload.data(), 
uncompressedPayload.readableBytes());
+    }
+    return values;
+}
+
+static void testDecryption(Client& client, const std::string& topic, bool 
decryptionSucceed,
+                           int numMessageReceived) {

Review Comment:
   The parameter name `decryptionSucceed` should use the correct verb form: 
`decryptionSucceeds` or a clearer name like `shouldDecrypt` or `withDecryption` 
to better indicate it's a boolean flag controlling whether decryption is 
configured.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to