github-actions[bot] commented on code in PR #60044:
URL: https://github.com/apache/doris/pull/60044#discussion_r2890450824


##########
be/src/runtime/routine_load/aws_msk_iam_auth.cpp:
##########
@@ -0,0 +1,492 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/routine_load/aws_msk_iam_auth.h"
+
+#include <aws/core/auth/AWSCredentials.h>
+#include <aws/core/auth/AWSCredentialsProvider.h>
+#include <aws/core/auth/AWSCredentialsProviderChain.h>
+#include <aws/core/auth/STSCredentialsProvider.h>
+#include <aws/core/platform/Environment.h>
+#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
+#include <aws/sts/STSClient.h>
+#include <aws/sts/model/AssumeRoleRequest.h>
+#include <openssl/hmac.h>
+#include <openssl/sha.h>
+
+#include <algorithm>
+#include <chrono>
+#include <iomanip>
+#include <sstream>
+
+#include "common/logging.h"
+
+namespace doris {
+
+AwsMskIamAuth::AwsMskIamAuth(Config config) : _config(std::move(config)) {
+    _credentials_provider = _create_credentials_provider();
+}
+
+std::shared_ptr<Aws::Auth::AWSCredentialsProvider> 
AwsMskIamAuth::_create_credentials_provider() {
+    if (!_config.role_arn.empty() && !_config.access_key.empty() && 
!_config.secret_key.empty()) {
+        LOG(INFO) << "Using AWS STS Assume Role with explicit credentials 
(cross-account): "
+                  << _config.role_arn << " (Access Key ID: " << 
_config.access_key.substr(0, 4)
+                  << "****)";
+
+        Aws::Client::ClientConfiguration client_config;
+        if (!_config.region.empty()) {
+            client_config.region = _config.region;
+        }
+
+        // Use explicit AK/SK as base credentials to assume the role
+        Aws::Auth::AWSCredentials base_credentials(_config.access_key, 
_config.secret_key);
+        auto base_provider =
+                
std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(base_credentials);
+
+        auto sts_client = std::make_shared<Aws::STS::STSClient>(base_provider, 
client_config);
+
+        return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+                _config.role_arn, Aws::String(), /* external_id */ 
Aws::String(),
+                Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client);
+    }
+    // 2. Explicit AK/SK credentials (direct access)
+    if (!_config.access_key.empty() && !_config.secret_key.empty()) {
+        LOG(INFO) << "Using explicit AWS credentials (Access Key ID: "
+                  << _config.access_key.substr(0, 4) << "****)";
+
+        Aws::Auth::AWSCredentials credentials(_config.access_key, 
_config.secret_key);
+
+        return 
std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(credentials);
+    }
+    // 3. Assume Role with Instance Profile (for same-account access from 
within AWS)
+    if (!_config.role_arn.empty()) {
+        LOG(INFO) << "Using AWS STS Assume Role with Instance Profile: " << 
_config.role_arn;
+
+        Aws::Client::ClientConfiguration client_config;
+        if (!_config.region.empty()) {
+            client_config.region = _config.region;
+        }
+
+        auto sts_client = std::make_shared<Aws::STS::STSClient>(
+                
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(), 
client_config);
+
+        return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+                _config.role_arn, Aws::String(), /* external_id */ 
Aws::String(),
+                Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client);
+    }
+    // 4. AWS Profile (reads from ~/.aws/credentials)
+    if (!_config.profile_name.empty()) {
+        LOG(INFO) << "Using AWS Profile: " << _config.profile_name;
+
+        return 
std::make_shared<Aws::Auth::ProfileConfigFileAWSCredentialsProvider>(
+                _config.profile_name.c_str());
+    }
+    // 5. Custom Credentials Provider
+    if (!_config.credentials_provider.empty()) {
+        LOG(INFO) << "Using custom credentials provider: " << 
_config.credentials_provider;
+
+        // Parse credentials provider type string
+        std::string provider_upper = _config.credentials_provider;
+        std::transform(provider_upper.begin(), provider_upper.end(), 
provider_upper.begin(),
+                       ::toupper);
+
+        if (provider_upper == "ENV" || provider_upper == "ENVIRONMENT") {
+            return 
std::make_shared<Aws::Auth::EnvironmentAWSCredentialsProvider>();
+        } else if (provider_upper == "INSTANCE_PROFILE" || provider_upper == 
"INSTANCEPROFILE") {
+            return 
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>();
+        } else if (provider_upper == "CONTAINER" || provider_upper == "ECS") {
+            return std::make_shared<Aws::Auth::TaskRoleCredentialsProvider>(
+                    
Aws::Environment::GetEnv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI").c_str());
+        } else if (provider_upper == "DEFAULT") {
+            return 
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+        } else {
+            LOG(WARNING) << "Unknown credentials provider type: " << 
_config.credentials_provider
+                         << ", falling back to default credentials provider 
chain";
+            return 
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+        }
+    }
+    // No valid credentials configuration found
+    LOG(ERROR) << "AWS MSK IAM authentication requires credentials. Please 
provide.";
+    return nullptr;
+}
+
+Status AwsMskIamAuth::get_credentials(Aws::Auth::AWSCredentials* credentials) {
+    std::lock_guard<std::mutex> lock(_mutex);
+
+    // Refresh if needed
+    if (_should_refresh_credentials()) {
+        _cached_credentials = _credentials_provider->GetAWSCredentials();
+
+        if (_cached_credentials.GetAWSAccessKeyId().empty()) {
+            return Status::InternalError("Failed to get AWS credentials");
+        }
+
+        // Set expiry time (assume 1 hour for instance profile, or use the 
credentials expiration)
+        _credentials_expiry = std::chrono::system_clock::now() + 
std::chrono::hours(1);
+
+        LOG(INFO) << "Refreshed AWS credentials for MSK IAM authentication";
+    }
+
+    *credentials = _cached_credentials;
+    return Status::OK();
+}
+
+bool AwsMskIamAuth::_should_refresh_credentials() {
+    auto now = std::chrono::system_clock::now();
+    auto refresh_time =
+            _credentials_expiry - 
std::chrono::milliseconds(_config.token_refresh_margin_ms);
+    return now >= refresh_time || 
_cached_credentials.GetAWSAccessKeyId().empty();
+}
+
+Status AwsMskIamAuth::generate_token(const std::string& broker_hostname, 
std::string* token,
+                                     int64_t* token_lifetime_ms) {
+    Aws::Auth::AWSCredentials credentials;
+    RETURN_IF_ERROR(get_credentials(&credentials));
+
+    std::string timestamp = _get_timestamp();
+    std::string date_stamp = _get_date_stamp(timestamp);
+
+    // AWS MSK IAM token is a base64-encoded presigned URL
+    // Reference: https://github.com/aws/aws-msk-iam-sasl-signer-python
+
+    // Token expiry in seconds (900 seconds = 15 minutes, matching AWS MSK IAM 
signer reference)
+    static constexpr int TOKEN_EXPIRY_SECONDS = 900;
+
+    // Build the endpoint URL
+    std::string endpoint_url = "https://kafka."; + _config.region + 
".amazonaws.com/";
+
+    // Build credential scope
+    std::string credential_scope =
+            date_stamp + "/" + _config.region + "/kafka-cluster/aws4_request";
+
+    // Build the canonical query string (sorted alphabetically)
+    // IMPORTANT: All query parameters must be included in the signature 
calculation
+    // Session Token must be in canonical query string if using temporary 
credentials
+    std::stringstream canonical_query_ss;
+    canonical_query_ss << "Action=kafka-cluster%3AConnect"; // URL-encoded :
+
+    // Add Algorithm
+    canonical_query_ss << "&X-Amz-Algorithm=AWS4-HMAC-SHA256";
+
+    // Add Credential
+    std::string credential = std::string(credentials.GetAWSAccessKeyId()) + 
"/" + credential_scope;
+    canonical_query_ss << "&X-Amz-Credential=" << _url_encode(credential);
+
+    // Add Date
+    canonical_query_ss << "&X-Amz-Date=" << timestamp;
+
+    // Add Expires
+    canonical_query_ss << "&X-Amz-Expires=" << TOKEN_EXPIRY_SECONDS;
+
+    // Add Security Token if present (MUST be before signature calculation)
+    if (!credentials.GetSessionToken().empty()) {
+        canonical_query_ss << "&X-Amz-Security-Token="
+                           << 
_url_encode(std::string(credentials.GetSessionToken()));
+    }
+
+    // Add SignedHeaders
+    canonical_query_ss << "&X-Amz-SignedHeaders=host";
+
+    std::string canonical_query_string = canonical_query_ss.str();
+
+    // Build the canonical headers
+    std::string host = "kafka." + _config.region + ".amazonaws.com";
+    std::string canonical_headers = "host:" + host + "\n";
+    std::string signed_headers = "host";
+
+    // Build the canonical request
+    std::string method = "GET";
+    std::string uri = "/";
+    std::string payload_hash = _sha256("");
+
+    std::string canonical_request = method + "\n" + uri + "\n" + 
canonical_query_string + "\n" +
+                                    canonical_headers + "\n" + signed_headers 
+ "\n" + payload_hash;
+
+    // Build the string to sign
+    std::string algorithm = "AWS4-HMAC-SHA256";
+    std::string canonical_request_hash = _sha256(canonical_request);
+    std::string string_to_sign =
+            algorithm + "\n" + timestamp + "\n" + credential_scope + "\n" + 
canonical_request_hash;
+
+    // Calculate signature
+    std::string signing_key = 
_calculate_signing_key(std::string(credentials.GetAWSSecretKey()),
+                                                     date_stamp, 
_config.region, "kafka-cluster");
+    std::string signature = _hmac_sha256_hex(signing_key, string_to_sign);
+
+    // Build the final presigned URL
+    // All parameters are already in canonical_query_string, just add signature
+    // Then add User-Agent AFTER signature (not part of signed content, 
matching reference impl)
+    std::string signed_url = endpoint_url + "?" + canonical_query_string +
+                             "&X-Amz-Signature=" + signature +
+                             "&User-Agent=doris-msk-iam-auth%2F1.0";
+
+    // Base64url encode the signed URL (without padding)
+    *token = _base64url_encode(signed_url);
+
+    // Token lifetime in milliseconds
+    *token_lifetime_ms = TOKEN_EXPIRY_SECONDS * 1000;
+
+    LOG(INFO) << "Generated AWS MSK IAM token, presigned URL: " << signed_url;
+
+    return Status::OK();
+}
+
+std::string AwsMskIamAuth::_hmac_sha256_hex(const std::string& key, const 
std::string& data) {
+    std::string raw = _hmac_sha256(key, data);
+    std::stringstream ss;
+    for (unsigned char c : raw) {
+        ss << std::hex << std::setw(2) << std::setfill('0') << 
static_cast<int>(c);
+    }
+    return ss.str();
+}
+
+std::string AwsMskIamAuth::_url_encode(const std::string& value) {
+    std::ostringstream escaped;
+    escaped.fill('0');
+    escaped << std::hex;
+
+    for (char c : value) {
+        // Keep alphanumeric and other accepted characters intact
+        if (isalnum(static_cast<unsigned char>(c)) || c == '-' || c == '_' || 
c == '.' ||
+            c == '~') {
+            escaped << c;
+        } else {
+            // Any other characters are percent-encoded
+            escaped << std::uppercase;
+            escaped << '%' << std::setw(2) << 
static_cast<int>(static_cast<unsigned char>(c));
+            escaped << std::nouppercase;
+        }
+    }
+
+    return escaped.str();
+}
+
+std::string AwsMskIamAuth::_base64url_encode(const std::string& input) {
+    // Standard base64 alphabet
+    static const char* base64_chars =
+            "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
+
+    std::string result;
+    result.reserve(((input.size() + 2) / 3) * 4);
+
+    const unsigned char* bytes = reinterpret_cast<const unsigned 
char*>(input.c_str());
+    size_t len = input.size();
+
+    for (size_t i = 0; i < len; i += 3) {
+        uint32_t n = static_cast<uint32_t>(bytes[i]) << 16;
+        if (i + 1 < len) n |= static_cast<uint32_t>(bytes[i + 1]) << 8;
+        if (i + 2 < len) n |= static_cast<uint32_t>(bytes[i + 2]);
+
+        result += base64_chars[(n >> 18) & 0x3F];
+        result += base64_chars[(n >> 12) & 0x3F];
+        if (i + 1 < len) result += base64_chars[(n >> 6) & 0x3F];
+        if (i + 2 < len) result += base64_chars[n & 0x3F];
+    }
+
+    // Convert to URL-safe base64 (replace + with -, / with _)
+    // and remove padding (=)
+    for (char& c : result) {
+        if (c == '+')
+            c = '-';
+        else if (c == '/')
+            c = '_';
+    }
+
+    return result;
+}
+
+std::string AwsMskIamAuth::_calculate_signing_key(const std::string& 
secret_key,
+                                                  const std::string& 
date_stamp,
+                                                  const std::string& region,
+                                                  const std::string& service) {
+    std::string k_secret = "AWS4" + secret_key;
+    std::string k_date = _hmac_sha256(k_secret, date_stamp);
+    std::string k_region = _hmac_sha256(k_date, region);
+    std::string k_service = _hmac_sha256(k_region, service);
+    std::string k_signing = _hmac_sha256(k_service, "aws4_request");
+    return k_signing;
+}
+
+std::string AwsMskIamAuth::_hmac_sha256(const std::string& key, const 
std::string& data) {
+    unsigned char* digest;
+    digest = HMAC(EVP_sha256(), key.c_str(), static_cast<int>(key.length()),
+                  reinterpret_cast<const unsigned char*>(data.c_str()), 
data.length(), nullptr,

Review Comment:
   **Bug (Critical - Thread Safety)**: `HMAC()` with `nullptr` as the output 
buffer (5th argument) writes to a **process-global static buffer** that is 
shared across all threads. From the OpenSSL manual:
   
   > "If md is NULL, the digest is placed in a static array. Note: passing a 
NULL value for md to use the static array is not thread safe."
   
   The `_mutex` in `get_credentials()` does NOT protect this call -- 
`_hmac_sha256` is called from `generate_token()` -> `_calculate_signing_key()` 
and `_hmac_sha256_hex()`, which are outside the mutex scope. Multiple 
librdkafka background threads (one per consumer) can invoke 
`oauthbearer_token_refresh_cb` concurrently, causing a data race on the static 
buffer.
   
   Also, the return value of `HMAC()` is not checked for `NULL`. If it fails, 
constructing a `std::string` from a null pointer is undefined behavior.
   
   **Fix**: Use a caller-owned output buffer:
   ```cpp
   std::string AwsMskIamAuth::_hmac_sha256(const std::string& key, const 
std::string& data) {
       unsigned char digest[EVP_MAX_MD_SIZE];
       unsigned int digest_len = 0;
       unsigned char* result = HMAC(EVP_sha256(), key.c_str(), 
static_cast<int>(key.length()),
                                    reinterpret_cast<const unsigned 
char*>(data.c_str()),
                                    data.length(), digest, &digest_len);
       DORIS_CHECK(result != nullptr) << "HMAC-SHA256 computation failed";
       return {reinterpret_cast<char*>(digest), digest_len};
   }
   ```



##########
be/src/runtime/routine_load/aws_msk_iam_auth.h:
##########
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <aws/core/auth/AWSCredentialsProvider.h>
+#include <librdkafka/rdkafkacpp.h>
+
+#include <memory>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+
+#include "common/status.h"
+
+namespace doris {
+

Review Comment:
   **Missing compile_check headers**: Per coding standards, new `.h` files 
should include paired `compile_check_begin.h` / `compile_check_end.h`. See 
`data_consumer_group.h` in the same directory for the expected pattern:
   ```cpp
   namespace doris {
   #include "common/compile_check_begin.h"
   // ... class definitions ...
   #include "common/compile_check_end.h"
   } // namespace doris
   ```



##########
be/src/runtime/routine_load/aws_msk_iam_auth.cpp:
##########
@@ -0,0 +1,492 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/routine_load/aws_msk_iam_auth.h"
+
+#include <aws/core/auth/AWSCredentials.h>
+#include <aws/core/auth/AWSCredentialsProvider.h>
+#include <aws/core/auth/AWSCredentialsProviderChain.h>
+#include <aws/core/auth/STSCredentialsProvider.h>
+#include <aws/core/platform/Environment.h>
+#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
+#include <aws/sts/STSClient.h>
+#include <aws/sts/model/AssumeRoleRequest.h>
+#include <openssl/hmac.h>
+#include <openssl/sha.h>
+
+#include <algorithm>
+#include <chrono>
+#include <iomanip>
+#include <sstream>
+
+#include "common/logging.h"
+
+namespace doris {
+
+AwsMskIamAuth::AwsMskIamAuth(Config config) : _config(std::move(config)) {
+    _credentials_provider = _create_credentials_provider();
+}
+
+std::shared_ptr<Aws::Auth::AWSCredentialsProvider> 
AwsMskIamAuth::_create_credentials_provider() {
+    if (!_config.role_arn.empty() && !_config.access_key.empty() && 
!_config.secret_key.empty()) {
+        LOG(INFO) << "Using AWS STS Assume Role with explicit credentials 
(cross-account): "
+                  << _config.role_arn << " (Access Key ID: " << 
_config.access_key.substr(0, 4)
+                  << "****)";
+
+        Aws::Client::ClientConfiguration client_config;
+        if (!_config.region.empty()) {
+            client_config.region = _config.region;
+        }
+
+        // Use explicit AK/SK as base credentials to assume the role
+        Aws::Auth::AWSCredentials base_credentials(_config.access_key, 
_config.secret_key);
+        auto base_provider =
+                
std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(base_credentials);
+
+        auto sts_client = std::make_shared<Aws::STS::STSClient>(base_provider, 
client_config);
+
+        return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+                _config.role_arn, Aws::String(), /* external_id */ 
Aws::String(),
+                Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client);
+    }
+    // 2. Explicit AK/SK credentials (direct access)
+    if (!_config.access_key.empty() && !_config.secret_key.empty()) {
+        LOG(INFO) << "Using explicit AWS credentials (Access Key ID: "
+                  << _config.access_key.substr(0, 4) << "****)";
+
+        Aws::Auth::AWSCredentials credentials(_config.access_key, 
_config.secret_key);
+
+        return 
std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(credentials);
+    }
+    // 3. Assume Role with Instance Profile (for same-account access from 
within AWS)
+    if (!_config.role_arn.empty()) {
+        LOG(INFO) << "Using AWS STS Assume Role with Instance Profile: " << 
_config.role_arn;
+
+        Aws::Client::ClientConfiguration client_config;
+        if (!_config.region.empty()) {
+            client_config.region = _config.region;
+        }
+
+        auto sts_client = std::make_shared<Aws::STS::STSClient>(
+                
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(), 
client_config);
+
+        return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+                _config.role_arn, Aws::String(), /* external_id */ 
Aws::String(),
+                Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client);
+    }
+    // 4. AWS Profile (reads from ~/.aws/credentials)
+    if (!_config.profile_name.empty()) {
+        LOG(INFO) << "Using AWS Profile: " << _config.profile_name;
+
+        return 
std::make_shared<Aws::Auth::ProfileConfigFileAWSCredentialsProvider>(
+                _config.profile_name.c_str());
+    }
+    // 5. Custom Credentials Provider
+    if (!_config.credentials_provider.empty()) {
+        LOG(INFO) << "Using custom credentials provider: " << 
_config.credentials_provider;
+
+        // Parse credentials provider type string
+        std::string provider_upper = _config.credentials_provider;
+        std::transform(provider_upper.begin(), provider_upper.end(), 
provider_upper.begin(),
+                       ::toupper);
+
+        if (provider_upper == "ENV" || provider_upper == "ENVIRONMENT") {
+            return 
std::make_shared<Aws::Auth::EnvironmentAWSCredentialsProvider>();
+        } else if (provider_upper == "INSTANCE_PROFILE" || provider_upper == 
"INSTANCEPROFILE") {
+            return 
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>();
+        } else if (provider_upper == "CONTAINER" || provider_upper == "ECS") {
+            return std::make_shared<Aws::Auth::TaskRoleCredentialsProvider>(
+                    
Aws::Environment::GetEnv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI").c_str());
+        } else if (provider_upper == "DEFAULT") {
+            return 
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+        } else {
+            LOG(WARNING) << "Unknown credentials provider type: " << 
_config.credentials_provider
+                         << ", falling back to default credentials provider 
chain";
+            return 
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+        }
+    }
+    // No valid credentials configuration found
+    LOG(ERROR) << "AWS MSK IAM authentication requires credentials. Please 
provide.";
+    return nullptr;
+}
+
+Status AwsMskIamAuth::get_credentials(Aws::Auth::AWSCredentials* credentials) {
+    std::lock_guard<std::mutex> lock(_mutex);
+
+    // Refresh if needed
+    if (_should_refresh_credentials()) {
+        _cached_credentials = _credentials_provider->GetAWSCredentials();
+

Review Comment:
   **Bug (Crash)**: `_credentials_provider` can be `nullptr` if 
`_create_credentials_provider()` returns null (which happens when no credential 
configuration matches -- see the `LOG(ERROR)` at the end of that method). 
Dereferencing a null `shared_ptr` here will crash the BE process.
   
   **Fix**: Add a null check:
   ```cpp
   if (!_credentials_provider) {
       return Status::InternalError(
           "No AWS credentials provider configured. "
           "Please provide access_key/secret_key, role_arn, profile_name, or 
credentials_provider.");
   }
   ```



##########
be/src/runtime/routine_load/data_consumer.h:
##########
@@ -31,6 +31,7 @@
 #include "common/logging.h"
 #include "common/status.h"
 #include "librdkafka/rdkafkacpp.h"
+#include "runtime/routine_load/aws_msk_iam_auth.h"
 #include "runtime/stream_load/stream_load_context.h"
 #include "util/uid_util.h"
 

Review Comment:
   **Fragile declaration order**: The comment on the line above says 
`_aws_msk_oauth_callback` "must outlive `_k_consumer`", but it is declared 
**after** `_k_consumer`. C++ destroys members in reverse declaration order, so 
`_aws_msk_oauth_callback` would be destroyed **first**.
   
   Currently safe because the explicit destructor `delete`s `_k_consumer` 
before member destructors run. However, this is fragile -- if someone refactors 
to `std::unique_ptr<RdKafka::KafkaConsumer>`, the destruction order would be 
wrong and could cause use-after-free during `close()`.
   
   **Fix**: Move this declaration **before** `_k_consumer` to make the 
destruction order inherently correct:
   ```cpp
   std::unique_ptr<AwsMskIamOAuthCallback> _aws_msk_oauth_callback;
   KafkaEventCb _k_event_cb;
   RdKafka::KafkaConsumer* _k_consumer = nullptr;
   ```



##########
be/src/runtime/routine_load/aws_msk_iam_auth.cpp:
##########
@@ -0,0 +1,492 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/routine_load/aws_msk_iam_auth.h"
+
+#include <aws/core/auth/AWSCredentials.h>
+#include <aws/core/auth/AWSCredentialsProvider.h>
+#include <aws/core/auth/AWSCredentialsProviderChain.h>
+#include <aws/core/auth/STSCredentialsProvider.h>
+#include <aws/core/platform/Environment.h>
+#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h>
+#include <aws/sts/STSClient.h>
+#include <aws/sts/model/AssumeRoleRequest.h>
+#include <openssl/hmac.h>
+#include <openssl/sha.h>
+
+#include <algorithm>
+#include <chrono>
+#include <iomanip>
+#include <sstream>
+
+#include "common/logging.h"
+
+namespace doris {
+
+AwsMskIamAuth::AwsMskIamAuth(Config config) : _config(std::move(config)) {
+    _credentials_provider = _create_credentials_provider();
+}
+
+std::shared_ptr<Aws::Auth::AWSCredentialsProvider> 
AwsMskIamAuth::_create_credentials_provider() {
+    if (!_config.role_arn.empty() && !_config.access_key.empty() && 
!_config.secret_key.empty()) {
+        LOG(INFO) << "Using AWS STS Assume Role with explicit credentials 
(cross-account): "
+                  << _config.role_arn << " (Access Key ID: " << 
_config.access_key.substr(0, 4)
+                  << "****)";
+
+        Aws::Client::ClientConfiguration client_config;
+        if (!_config.region.empty()) {
+            client_config.region = _config.region;
+        }
+
+        // Use explicit AK/SK as base credentials to assume the role
+        Aws::Auth::AWSCredentials base_credentials(_config.access_key, 
_config.secret_key);
+        auto base_provider =
+                
std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(base_credentials);
+
+        auto sts_client = std::make_shared<Aws::STS::STSClient>(base_provider, 
client_config);
+
+        return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+                _config.role_arn, Aws::String(), /* external_id */ 
Aws::String(),
+                Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client);
+    }
+    // 2. Explicit AK/SK credentials (direct access)
+    if (!_config.access_key.empty() && !_config.secret_key.empty()) {
+        LOG(INFO) << "Using explicit AWS credentials (Access Key ID: "
+                  << _config.access_key.substr(0, 4) << "****)";
+
+        Aws::Auth::AWSCredentials credentials(_config.access_key, 
_config.secret_key);
+
+        return 
std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(credentials);
+    }
+    // 3. Assume Role with Instance Profile (for same-account access from 
within AWS)
+    if (!_config.role_arn.empty()) {
+        LOG(INFO) << "Using AWS STS Assume Role with Instance Profile: " << 
_config.role_arn;
+
+        Aws::Client::ClientConfiguration client_config;
+        if (!_config.region.empty()) {
+            client_config.region = _config.region;
+        }
+
+        auto sts_client = std::make_shared<Aws::STS::STSClient>(
+                
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(), 
client_config);
+
+        return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+                _config.role_arn, Aws::String(), /* external_id */ 
Aws::String(),
+                Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client);
+    }
+    // 4. AWS Profile (reads from ~/.aws/credentials)
+    if (!_config.profile_name.empty()) {
+        LOG(INFO) << "Using AWS Profile: " << _config.profile_name;
+
+        return 
std::make_shared<Aws::Auth::ProfileConfigFileAWSCredentialsProvider>(
+                _config.profile_name.c_str());
+    }
+    // 5. Custom Credentials Provider
+    if (!_config.credentials_provider.empty()) {
+        LOG(INFO) << "Using custom credentials provider: " << 
_config.credentials_provider;
+
+        // Parse credentials provider type string
+        std::string provider_upper = _config.credentials_provider;
+        std::transform(provider_upper.begin(), provider_upper.end(), 
provider_upper.begin(),
+                       ::toupper);
+
+        if (provider_upper == "ENV" || provider_upper == "ENVIRONMENT") {
+            return 
std::make_shared<Aws::Auth::EnvironmentAWSCredentialsProvider>();
+        } else if (provider_upper == "INSTANCE_PROFILE" || provider_upper == 
"INSTANCEPROFILE") {
+            return 
std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>();
+        } else if (provider_upper == "CONTAINER" || provider_upper == "ECS") {
+            return std::make_shared<Aws::Auth::TaskRoleCredentialsProvider>(
+                    
Aws::Environment::GetEnv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI").c_str());
+        } else if (provider_upper == "DEFAULT") {
+            return 
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+        } else {
+            LOG(WARNING) << "Unknown credentials provider type: " << 
_config.credentials_provider
+                         << ", falling back to default credentials provider 
chain";
+            return 
std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+        }
+    }
+    // No valid credentials configuration found
+    LOG(ERROR) << "AWS MSK IAM authentication requires credentials. Please 
provide.";
+    return nullptr;
+}
+
+Status AwsMskIamAuth::get_credentials(Aws::Auth::AWSCredentials* credentials) {
+    std::lock_guard<std::mutex> lock(_mutex);
+
+    // Refresh if needed
+    if (_should_refresh_credentials()) {
+        _cached_credentials = _credentials_provider->GetAWSCredentials();
+
+        if (_cached_credentials.GetAWSAccessKeyId().empty()) {
+            return Status::InternalError("Failed to get AWS credentials");
+        }
+
+        // Set expiry time (assume 1 hour for instance profile, or use the 
credentials expiration)
+        _credentials_expiry = std::chrono::system_clock::now() + 
std::chrono::hours(1);
+
+        LOG(INFO) << "Refreshed AWS credentials for MSK IAM authentication";
+    }
+
+    *credentials = _cached_credentials;
+    return Status::OK();
+}
+
+bool AwsMskIamAuth::_should_refresh_credentials() {
+    auto now = std::chrono::system_clock::now();
+    auto refresh_time =
+            _credentials_expiry - 
std::chrono::milliseconds(_config.token_refresh_margin_ms);
+    return now >= refresh_time || 
_cached_credentials.GetAWSAccessKeyId().empty();
+}
+
+Status AwsMskIamAuth::generate_token(const std::string& broker_hostname, 
std::string* token,
+                                     int64_t* token_lifetime_ms) {
+    Aws::Auth::AWSCredentials credentials;
+    RETURN_IF_ERROR(get_credentials(&credentials));
+
+    std::string timestamp = _get_timestamp();
+    std::string date_stamp = _get_date_stamp(timestamp);
+
+    // AWS MSK IAM token is a base64-encoded presigned URL
+    // Reference: https://github.com/aws/aws-msk-iam-sasl-signer-python
+
+    // Token expiry in seconds (900 seconds = 15 minutes, matching AWS MSK IAM 
signer reference)
+    static constexpr int TOKEN_EXPIRY_SECONDS = 900;
+
+    // Build the endpoint URL
+    std::string endpoint_url = "https://kafka."; + _config.region + 
".amazonaws.com/";
+
+    // Build credential scope
+    std::string credential_scope =
+            date_stamp + "/" + _config.region + "/kafka-cluster/aws4_request";
+
+    // Build the canonical query string (sorted alphabetically)
+    // IMPORTANT: All query parameters must be included in the signature 
calculation
+    // Session Token must be in canonical query string if using temporary 
credentials
+    std::stringstream canonical_query_ss;
+    canonical_query_ss << "Action=kafka-cluster%3AConnect"; // URL-encoded :
+
+    // Add Algorithm
+    canonical_query_ss << "&X-Amz-Algorithm=AWS4-HMAC-SHA256";
+
+    // Add Credential
+    std::string credential = std::string(credentials.GetAWSAccessKeyId()) + 
"/" + credential_scope;
+    canonical_query_ss << "&X-Amz-Credential=" << _url_encode(credential);
+
+    // Add Date
+    canonical_query_ss << "&X-Amz-Date=" << timestamp;
+
+    // Add Expires
+    canonical_query_ss << "&X-Amz-Expires=" << TOKEN_EXPIRY_SECONDS;
+
+    // Add Security Token if present (MUST be before signature calculation)
+    if (!credentials.GetSessionToken().empty()) {
+        canonical_query_ss << "&X-Amz-Security-Token="
+                           << 
_url_encode(std::string(credentials.GetSessionToken()));
+    }
+
+    // Add SignedHeaders
+    canonical_query_ss << "&X-Amz-SignedHeaders=host";
+
+    std::string canonical_query_string = canonical_query_ss.str();
+
+    // Build the canonical headers
+    std::string host = "kafka." + _config.region + ".amazonaws.com";
+    std::string canonical_headers = "host:" + host + "\n";
+    std::string signed_headers = "host";
+
+    // Build the canonical request
+    std::string method = "GET";
+    std::string uri = "/";
+    std::string payload_hash = _sha256("");
+
+    std::string canonical_request = method + "\n" + uri + "\n" + 
canonical_query_string + "\n" +
+                                    canonical_headers + "\n" + signed_headers 
+ "\n" + payload_hash;
+
+    // Build the string to sign
+    std::string algorithm = "AWS4-HMAC-SHA256";
+    std::string canonical_request_hash = _sha256(canonical_request);
+    std::string string_to_sign =
+            algorithm + "\n" + timestamp + "\n" + credential_scope + "\n" + 
canonical_request_hash;
+
+    // Calculate signature
+    std::string signing_key = 
_calculate_signing_key(std::string(credentials.GetAWSSecretKey()),
+                                                     date_stamp, 
_config.region, "kafka-cluster");
+    std::string signature = _hmac_sha256_hex(signing_key, string_to_sign);
+
+    // Build the final presigned URL
+    // All parameters are already in canonical_query_string, just add signature
+    // Then add User-Agent AFTER signature (not part of signed content, 
matching reference impl)
+    std::string signed_url = endpoint_url + "?" + canonical_query_string +
+                             "&X-Amz-Signature=" + signature +
+                             "&User-Agent=doris-msk-iam-auth%2F1.0";
+
+    // Base64url encode the signed URL (without padding)
+    *token = _base64url_encode(signed_url);
+
+    // Token lifetime in milliseconds
+    *token_lifetime_ms = TOKEN_EXPIRY_SECONDS * 1000;
+
+    LOG(INFO) << "Generated AWS MSK IAM token, presigned URL: " << signed_url;
+

Review Comment:
   **Security Issue**: This logs the **complete presigned URL** at INFO level. 
The URL contains:
   - The full AWS Access Key ID (in `X-Amz-Credential`)
   - The full session token (in `X-Amz-Security-Token`) if using temporary 
credentials
   - The computed signature (in `X-Amz-Signature`)
   
   The presigned URL is a **bearer credential** -- anyone who possesses it can 
authenticate to the MSK cluster for 900 seconds. Logging it in production logs 
creates a credential leak vector.
   
   **Fix**: Either remove this log entirely or use `VLOG_DEBUG` and redact the 
sensitive query parameters:
   ```cpp
   VLOG_DEBUG << "Generated AWS MSK IAM token for region: " << _config.region;
   ```



##########
be/src/runtime/routine_load/data_consumer.cpp:
##########
@@ -99,6 +100,15 @@ Status 
KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) {
     }

Review Comment:
   **Bug (Critical)**: This check will **never match**. It tests whether the 
property **value** (`item.second`) starts with `"AWS:"`, but AWS property 
values are normal strings like `"us-east-1"`, `"AKIAIOSFODNN7EXAMPLE"`, etc. 
None of them start with `"AWS:"`.
   
   The correct fix is to check the property **key** (`item.first`) for the 
`"aws."` prefix:
   ```cpp
   if (starts_with(item.first, "aws.")) {
       LOG(INFO) << "Skipping AWS property for librdkafka: " << item.first;
       continue;
   }
   ```
   
   Compare with the `starts_with(item.second, "FILE:")` check a few lines below 
-- that one is correct because FE explicitly encodes values as 
`"FILE:file_id:md5"`. There is no analogous `"AWS:"` value encoding.
   
   **Impact**: Without this fix, AWS properties (`aws.region`, 
`aws.access.key`, `aws.secret.key`, etc.) will be passed to librdkafka via 
`set_conf()`, which will reject them as unknown configuration properties, 
causing the routine load to fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to