github-actions[bot] commented on code in PR #60044: URL: https://github.com/apache/doris/pull/60044#discussion_r2890450824
########## be/src/runtime/routine_load/aws_msk_iam_auth.cpp: ########## @@ -0,0 +1,492 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/routine_load/aws_msk_iam_auth.h" + +#include <aws/core/auth/AWSCredentials.h> +#include <aws/core/auth/AWSCredentialsProvider.h> +#include <aws/core/auth/AWSCredentialsProviderChain.h> +#include <aws/core/auth/STSCredentialsProvider.h> +#include <aws/core/platform/Environment.h> +#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h> +#include <aws/sts/STSClient.h> +#include <aws/sts/model/AssumeRoleRequest.h> +#include <openssl/hmac.h> +#include <openssl/sha.h> + +#include <algorithm> +#include <chrono> +#include <iomanip> +#include <sstream> + +#include "common/logging.h" + +namespace doris { + +AwsMskIamAuth::AwsMskIamAuth(Config config) : _config(std::move(config)) { + _credentials_provider = _create_credentials_provider(); +} + +std::shared_ptr<Aws::Auth::AWSCredentialsProvider> AwsMskIamAuth::_create_credentials_provider() { + if (!_config.role_arn.empty() && !_config.access_key.empty() && !_config.secret_key.empty()) { + LOG(INFO) << "Using AWS STS Assume Role with explicit credentials (cross-account): " + << _config.role_arn << " (Access Key ID: " << _config.access_key.substr(0, 4) + << "****)"; + + Aws::Client::ClientConfiguration client_config; + if (!_config.region.empty()) { + client_config.region = _config.region; + } + + // Use explicit AK/SK as base credentials to assume the role + Aws::Auth::AWSCredentials base_credentials(_config.access_key, _config.secret_key); + auto base_provider = + std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(base_credentials); + + auto sts_client = std::make_shared<Aws::STS::STSClient>(base_provider, client_config); + + return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>( + _config.role_arn, Aws::String(), /* external_id */ Aws::String(), + Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client); + } + // 2. Explicit AK/SK credentials (direct access) + if (!_config.access_key.empty() && !_config.secret_key.empty()) { + LOG(INFO) << "Using explicit AWS credentials (Access Key ID: " + << _config.access_key.substr(0, 4) << "****)"; + + Aws::Auth::AWSCredentials credentials(_config.access_key, _config.secret_key); + + return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(credentials); + } + // 3. Assume Role with Instance Profile (for same-account access from within AWS) + if (!_config.role_arn.empty()) { + LOG(INFO) << "Using AWS STS Assume Role with Instance Profile: " << _config.role_arn; + + Aws::Client::ClientConfiguration client_config; + if (!_config.region.empty()) { + client_config.region = _config.region; + } + + auto sts_client = std::make_shared<Aws::STS::STSClient>( + std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(), client_config); + + return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>( + _config.role_arn, Aws::String(), /* external_id */ Aws::String(), + Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client); + } + // 4. AWS Profile (reads from ~/.aws/credentials) + if (!_config.profile_name.empty()) { + LOG(INFO) << "Using AWS Profile: " << _config.profile_name; + + return std::make_shared<Aws::Auth::ProfileConfigFileAWSCredentialsProvider>( + _config.profile_name.c_str()); + } + // 5. Custom Credentials Provider + if (!_config.credentials_provider.empty()) { + LOG(INFO) << "Using custom credentials provider: " << _config.credentials_provider; + + // Parse credentials provider type string + std::string provider_upper = _config.credentials_provider; + std::transform(provider_upper.begin(), provider_upper.end(), provider_upper.begin(), + ::toupper); + + if (provider_upper == "ENV" || provider_upper == "ENVIRONMENT") { + return std::make_shared<Aws::Auth::EnvironmentAWSCredentialsProvider>(); + } else if (provider_upper == "INSTANCE_PROFILE" || provider_upper == "INSTANCEPROFILE") { + return std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(); + } else if (provider_upper == "CONTAINER" || provider_upper == "ECS") { + return std::make_shared<Aws::Auth::TaskRoleCredentialsProvider>( + Aws::Environment::GetEnv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI").c_str()); + } else if (provider_upper == "DEFAULT") { + return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(); + } else { + LOG(WARNING) << "Unknown credentials provider type: " << _config.credentials_provider + << ", falling back to default credentials provider chain"; + return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(); + } + } + // No valid credentials configuration found + LOG(ERROR) << "AWS MSK IAM authentication requires credentials. Please provide."; + return nullptr; +} + +Status AwsMskIamAuth::get_credentials(Aws::Auth::AWSCredentials* credentials) { + std::lock_guard<std::mutex> lock(_mutex); + + // Refresh if needed + if (_should_refresh_credentials()) { + _cached_credentials = _credentials_provider->GetAWSCredentials(); + + if (_cached_credentials.GetAWSAccessKeyId().empty()) { + return Status::InternalError("Failed to get AWS credentials"); + } + + // Set expiry time (assume 1 hour for instance profile, or use the credentials expiration) + _credentials_expiry = std::chrono::system_clock::now() + std::chrono::hours(1); + + LOG(INFO) << "Refreshed AWS credentials for MSK IAM authentication"; + } + + *credentials = _cached_credentials; + return Status::OK(); +} + +bool AwsMskIamAuth::_should_refresh_credentials() { + auto now = std::chrono::system_clock::now(); + auto refresh_time = + _credentials_expiry - std::chrono::milliseconds(_config.token_refresh_margin_ms); + return now >= refresh_time || _cached_credentials.GetAWSAccessKeyId().empty(); +} + +Status AwsMskIamAuth::generate_token(const std::string& broker_hostname, std::string* token, + int64_t* token_lifetime_ms) { + Aws::Auth::AWSCredentials credentials; + RETURN_IF_ERROR(get_credentials(&credentials)); + + std::string timestamp = _get_timestamp(); + std::string date_stamp = _get_date_stamp(timestamp); + + // AWS MSK IAM token is a base64-encoded presigned URL + // Reference: https://github.com/aws/aws-msk-iam-sasl-signer-python + + // Token expiry in seconds (900 seconds = 15 minutes, matching AWS MSK IAM signer reference) + static constexpr int TOKEN_EXPIRY_SECONDS = 900; + + // Build the endpoint URL + std::string endpoint_url = "https://kafka." + _config.region + ".amazonaws.com/"; + + // Build credential scope + std::string credential_scope = + date_stamp + "/" + _config.region + "/kafka-cluster/aws4_request"; + + // Build the canonical query string (sorted alphabetically) + // IMPORTANT: All query parameters must be included in the signature calculation + // Session Token must be in canonical query string if using temporary credentials + std::stringstream canonical_query_ss; + canonical_query_ss << "Action=kafka-cluster%3AConnect"; // URL-encoded : + + // Add Algorithm + canonical_query_ss << "&X-Amz-Algorithm=AWS4-HMAC-SHA256"; + + // Add Credential + std::string credential = std::string(credentials.GetAWSAccessKeyId()) + "/" + credential_scope; + canonical_query_ss << "&X-Amz-Credential=" << _url_encode(credential); + + // Add Date + canonical_query_ss << "&X-Amz-Date=" << timestamp; + + // Add Expires + canonical_query_ss << "&X-Amz-Expires=" << TOKEN_EXPIRY_SECONDS; + + // Add Security Token if present (MUST be before signature calculation) + if (!credentials.GetSessionToken().empty()) { + canonical_query_ss << "&X-Amz-Security-Token=" + << _url_encode(std::string(credentials.GetSessionToken())); + } + + // Add SignedHeaders + canonical_query_ss << "&X-Amz-SignedHeaders=host"; + + std::string canonical_query_string = canonical_query_ss.str(); + + // Build the canonical headers + std::string host = "kafka." + _config.region + ".amazonaws.com"; + std::string canonical_headers = "host:" + host + "\n"; + std::string signed_headers = "host"; + + // Build the canonical request + std::string method = "GET"; + std::string uri = "/"; + std::string payload_hash = _sha256(""); + + std::string canonical_request = method + "\n" + uri + "\n" + canonical_query_string + "\n" + + canonical_headers + "\n" + signed_headers + "\n" + payload_hash; + + // Build the string to sign + std::string algorithm = "AWS4-HMAC-SHA256"; + std::string canonical_request_hash = _sha256(canonical_request); + std::string string_to_sign = + algorithm + "\n" + timestamp + "\n" + credential_scope + "\n" + canonical_request_hash; + + // Calculate signature + std::string signing_key = _calculate_signing_key(std::string(credentials.GetAWSSecretKey()), + date_stamp, _config.region, "kafka-cluster"); + std::string signature = _hmac_sha256_hex(signing_key, string_to_sign); + + // Build the final presigned URL + // All parameters are already in canonical_query_string, just add signature + // Then add User-Agent AFTER signature (not part of signed content, matching reference impl) + std::string signed_url = endpoint_url + "?" + canonical_query_string + + "&X-Amz-Signature=" + signature + + "&User-Agent=doris-msk-iam-auth%2F1.0"; + + // Base64url encode the signed URL (without padding) + *token = _base64url_encode(signed_url); + + // Token lifetime in milliseconds + *token_lifetime_ms = TOKEN_EXPIRY_SECONDS * 1000; + + LOG(INFO) << "Generated AWS MSK IAM token, presigned URL: " << signed_url; + + return Status::OK(); +} + +std::string AwsMskIamAuth::_hmac_sha256_hex(const std::string& key, const std::string& data) { + std::string raw = _hmac_sha256(key, data); + std::stringstream ss; + for (unsigned char c : raw) { + ss << std::hex << std::setw(2) << std::setfill('0') << static_cast<int>(c); + } + return ss.str(); +} + +std::string AwsMskIamAuth::_url_encode(const std::string& value) { + std::ostringstream escaped; + escaped.fill('0'); + escaped << std::hex; + + for (char c : value) { + // Keep alphanumeric and other accepted characters intact + if (isalnum(static_cast<unsigned char>(c)) || c == '-' || c == '_' || c == '.' || + c == '~') { + escaped << c; + } else { + // Any other characters are percent-encoded + escaped << std::uppercase; + escaped << '%' << std::setw(2) << static_cast<int>(static_cast<unsigned char>(c)); + escaped << std::nouppercase; + } + } + + return escaped.str(); +} + +std::string AwsMskIamAuth::_base64url_encode(const std::string& input) { + // Standard base64 alphabet + static const char* base64_chars = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + + std::string result; + result.reserve(((input.size() + 2) / 3) * 4); + + const unsigned char* bytes = reinterpret_cast<const unsigned char*>(input.c_str()); + size_t len = input.size(); + + for (size_t i = 0; i < len; i += 3) { + uint32_t n = static_cast<uint32_t>(bytes[i]) << 16; + if (i + 1 < len) n |= static_cast<uint32_t>(bytes[i + 1]) << 8; + if (i + 2 < len) n |= static_cast<uint32_t>(bytes[i + 2]); + + result += base64_chars[(n >> 18) & 0x3F]; + result += base64_chars[(n >> 12) & 0x3F]; + if (i + 1 < len) result += base64_chars[(n >> 6) & 0x3F]; + if (i + 2 < len) result += base64_chars[n & 0x3F]; + } + + // Convert to URL-safe base64 (replace + with -, / with _) + // and remove padding (=) + for (char& c : result) { + if (c == '+') + c = '-'; + else if (c == '/') + c = '_'; + } + + return result; +} + +std::string AwsMskIamAuth::_calculate_signing_key(const std::string& secret_key, + const std::string& date_stamp, + const std::string& region, + const std::string& service) { + std::string k_secret = "AWS4" + secret_key; + std::string k_date = _hmac_sha256(k_secret, date_stamp); + std::string k_region = _hmac_sha256(k_date, region); + std::string k_service = _hmac_sha256(k_region, service); + std::string k_signing = _hmac_sha256(k_service, "aws4_request"); + return k_signing; +} + +std::string AwsMskIamAuth::_hmac_sha256(const std::string& key, const std::string& data) { + unsigned char* digest; + digest = HMAC(EVP_sha256(), key.c_str(), static_cast<int>(key.length()), + reinterpret_cast<const unsigned char*>(data.c_str()), data.length(), nullptr, Review Comment: **Bug (Critical - Thread Safety)**: `HMAC()` with `nullptr` as the output buffer (5th argument) writes to a **process-global static buffer** that is shared across all threads. From the OpenSSL manual: > "If md is NULL, the digest is placed in a static array. Note: passing a NULL value for md to use the static array is not thread safe." The `_mutex` in `get_credentials()` does NOT protect this call -- `_hmac_sha256` is called from `generate_token()` -> `_calculate_signing_key()` and `_hmac_sha256_hex()`, which are outside the mutex scope. Multiple librdkafka background threads (one per consumer) can invoke `oauthbearer_token_refresh_cb` concurrently, causing a data race on the static buffer. Also, the return value of `HMAC()` is not checked for `NULL`. If it fails, constructing a `std::string` from a null pointer is undefined behavior. **Fix**: Use a caller-owned output buffer: ```cpp std::string AwsMskIamAuth::_hmac_sha256(const std::string& key, const std::string& data) { unsigned char digest[EVP_MAX_MD_SIZE]; unsigned int digest_len = 0; unsigned char* result = HMAC(EVP_sha256(), key.c_str(), static_cast<int>(key.length()), reinterpret_cast<const unsigned char*>(data.c_str()), data.length(), digest, &digest_len); DORIS_CHECK(result != nullptr) << "HMAC-SHA256 computation failed"; return {reinterpret_cast<char*>(digest), digest_len}; } ``` ########## be/src/runtime/routine_load/aws_msk_iam_auth.h: ########## @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <aws/core/auth/AWSCredentialsProvider.h> +#include <librdkafka/rdkafkacpp.h> + +#include <memory> +#include <mutex> +#include <string> +#include <unordered_map> + +#include "common/status.h" + +namespace doris { + Review Comment: **Missing compile_check headers**: Per coding standards, new `.h` files should include paired `compile_check_begin.h` / `compile_check_end.h`. See `data_consumer_group.h` in the same directory for the expected pattern: ```cpp namespace doris { #include "common/compile_check_begin.h" // ... class definitions ... #include "common/compile_check_end.h" } // namespace doris ``` ########## be/src/runtime/routine_load/aws_msk_iam_auth.cpp: ########## @@ -0,0 +1,492 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/routine_load/aws_msk_iam_auth.h" + +#include <aws/core/auth/AWSCredentials.h> +#include <aws/core/auth/AWSCredentialsProvider.h> +#include <aws/core/auth/AWSCredentialsProviderChain.h> +#include <aws/core/auth/STSCredentialsProvider.h> +#include <aws/core/platform/Environment.h> +#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h> +#include <aws/sts/STSClient.h> +#include <aws/sts/model/AssumeRoleRequest.h> +#include <openssl/hmac.h> +#include <openssl/sha.h> + +#include <algorithm> +#include <chrono> +#include <iomanip> +#include <sstream> + +#include "common/logging.h" + +namespace doris { + +AwsMskIamAuth::AwsMskIamAuth(Config config) : _config(std::move(config)) { + _credentials_provider = _create_credentials_provider(); +} + +std::shared_ptr<Aws::Auth::AWSCredentialsProvider> AwsMskIamAuth::_create_credentials_provider() { + if (!_config.role_arn.empty() && !_config.access_key.empty() && !_config.secret_key.empty()) { + LOG(INFO) << "Using AWS STS Assume Role with explicit credentials (cross-account): " + << _config.role_arn << " (Access Key ID: " << _config.access_key.substr(0, 4) + << "****)"; + + Aws::Client::ClientConfiguration client_config; + if (!_config.region.empty()) { + client_config.region = _config.region; + } + + // Use explicit AK/SK as base credentials to assume the role + Aws::Auth::AWSCredentials base_credentials(_config.access_key, _config.secret_key); + auto base_provider = + std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(base_credentials); + + auto sts_client = std::make_shared<Aws::STS::STSClient>(base_provider, client_config); + + return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>( + _config.role_arn, Aws::String(), /* external_id */ Aws::String(), + Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client); + } + // 2. Explicit AK/SK credentials (direct access) + if (!_config.access_key.empty() && !_config.secret_key.empty()) { + LOG(INFO) << "Using explicit AWS credentials (Access Key ID: " + << _config.access_key.substr(0, 4) << "****)"; + + Aws::Auth::AWSCredentials credentials(_config.access_key, _config.secret_key); + + return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(credentials); + } + // 3. Assume Role with Instance Profile (for same-account access from within AWS) + if (!_config.role_arn.empty()) { + LOG(INFO) << "Using AWS STS Assume Role with Instance Profile: " << _config.role_arn; + + Aws::Client::ClientConfiguration client_config; + if (!_config.region.empty()) { + client_config.region = _config.region; + } + + auto sts_client = std::make_shared<Aws::STS::STSClient>( + std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(), client_config); + + return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>( + _config.role_arn, Aws::String(), /* external_id */ Aws::String(), + Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client); + } + // 4. AWS Profile (reads from ~/.aws/credentials) + if (!_config.profile_name.empty()) { + LOG(INFO) << "Using AWS Profile: " << _config.profile_name; + + return std::make_shared<Aws::Auth::ProfileConfigFileAWSCredentialsProvider>( + _config.profile_name.c_str()); + } + // 5. Custom Credentials Provider + if (!_config.credentials_provider.empty()) { + LOG(INFO) << "Using custom credentials provider: " << _config.credentials_provider; + + // Parse credentials provider type string + std::string provider_upper = _config.credentials_provider; + std::transform(provider_upper.begin(), provider_upper.end(), provider_upper.begin(), + ::toupper); + + if (provider_upper == "ENV" || provider_upper == "ENVIRONMENT") { + return std::make_shared<Aws::Auth::EnvironmentAWSCredentialsProvider>(); + } else if (provider_upper == "INSTANCE_PROFILE" || provider_upper == "INSTANCEPROFILE") { + return std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(); + } else if (provider_upper == "CONTAINER" || provider_upper == "ECS") { + return std::make_shared<Aws::Auth::TaskRoleCredentialsProvider>( + Aws::Environment::GetEnv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI").c_str()); + } else if (provider_upper == "DEFAULT") { + return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(); + } else { + LOG(WARNING) << "Unknown credentials provider type: " << _config.credentials_provider + << ", falling back to default credentials provider chain"; + return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(); + } + } + // No valid credentials configuration found + LOG(ERROR) << "AWS MSK IAM authentication requires credentials. Please provide."; + return nullptr; +} + +Status AwsMskIamAuth::get_credentials(Aws::Auth::AWSCredentials* credentials) { + std::lock_guard<std::mutex> lock(_mutex); + + // Refresh if needed + if (_should_refresh_credentials()) { + _cached_credentials = _credentials_provider->GetAWSCredentials(); + Review Comment: **Bug (Crash)**: `_credentials_provider` can be `nullptr` if `_create_credentials_provider()` returns null (which happens when no credential configuration matches -- see the `LOG(ERROR)` at the end of that method). Dereferencing a null `shared_ptr` here will crash the BE process. **Fix**: Add a null check: ```cpp if (!_credentials_provider) { return Status::InternalError( "No AWS credentials provider configured. " "Please provide access_key/secret_key, role_arn, profile_name, or credentials_provider."); } ``` ########## be/src/runtime/routine_load/data_consumer.h: ########## @@ -31,6 +31,7 @@ #include "common/logging.h" #include "common/status.h" #include "librdkafka/rdkafkacpp.h" +#include "runtime/routine_load/aws_msk_iam_auth.h" #include "runtime/stream_load/stream_load_context.h" #include "util/uid_util.h" Review Comment: **Fragile declaration order**: The comment on the line above says `_aws_msk_oauth_callback` "must outlive `_k_consumer`", but it is declared **after** `_k_consumer`. C++ destroys members in reverse declaration order, so `_aws_msk_oauth_callback` would be destroyed **first**. Currently safe because the explicit destructor `delete`s `_k_consumer` before member destructors run. However, this is fragile -- if someone refactors to `std::unique_ptr<RdKafka::KafkaConsumer>`, the destruction order would be wrong and could cause use-after-free during `close()`. **Fix**: Move this declaration **before** `_k_consumer` to make the destruction order inherently correct: ```cpp std::unique_ptr<AwsMskIamOAuthCallback> _aws_msk_oauth_callback; KafkaEventCb _k_event_cb; RdKafka::KafkaConsumer* _k_consumer = nullptr; ``` ########## be/src/runtime/routine_load/aws_msk_iam_auth.cpp: ########## @@ -0,0 +1,492 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/routine_load/aws_msk_iam_auth.h" + +#include <aws/core/auth/AWSCredentials.h> +#include <aws/core/auth/AWSCredentialsProvider.h> +#include <aws/core/auth/AWSCredentialsProviderChain.h> +#include <aws/core/auth/STSCredentialsProvider.h> +#include <aws/core/platform/Environment.h> +#include <aws/identity-management/auth/STSAssumeRoleCredentialsProvider.h> +#include <aws/sts/STSClient.h> +#include <aws/sts/model/AssumeRoleRequest.h> +#include <openssl/hmac.h> +#include <openssl/sha.h> + +#include <algorithm> +#include <chrono> +#include <iomanip> +#include <sstream> + +#include "common/logging.h" + +namespace doris { + +AwsMskIamAuth::AwsMskIamAuth(Config config) : _config(std::move(config)) { + _credentials_provider = _create_credentials_provider(); +} + +std::shared_ptr<Aws::Auth::AWSCredentialsProvider> AwsMskIamAuth::_create_credentials_provider() { + if (!_config.role_arn.empty() && !_config.access_key.empty() && !_config.secret_key.empty()) { + LOG(INFO) << "Using AWS STS Assume Role with explicit credentials (cross-account): " + << _config.role_arn << " (Access Key ID: " << _config.access_key.substr(0, 4) + << "****)"; + + Aws::Client::ClientConfiguration client_config; + if (!_config.region.empty()) { + client_config.region = _config.region; + } + + // Use explicit AK/SK as base credentials to assume the role + Aws::Auth::AWSCredentials base_credentials(_config.access_key, _config.secret_key); + auto base_provider = + std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(base_credentials); + + auto sts_client = std::make_shared<Aws::STS::STSClient>(base_provider, client_config); + + return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>( + _config.role_arn, Aws::String(), /* external_id */ Aws::String(), + Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client); + } + // 2. Explicit AK/SK credentials (direct access) + if (!_config.access_key.empty() && !_config.secret_key.empty()) { + LOG(INFO) << "Using explicit AWS credentials (Access Key ID: " + << _config.access_key.substr(0, 4) << "****)"; + + Aws::Auth::AWSCredentials credentials(_config.access_key, _config.secret_key); + + return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(credentials); + } + // 3. Assume Role with Instance Profile (for same-account access from within AWS) + if (!_config.role_arn.empty()) { + LOG(INFO) << "Using AWS STS Assume Role with Instance Profile: " << _config.role_arn; + + Aws::Client::ClientConfiguration client_config; + if (!_config.region.empty()) { + client_config.region = _config.region; + } + + auto sts_client = std::make_shared<Aws::STS::STSClient>( + std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(), client_config); + + return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>( + _config.role_arn, Aws::String(), /* external_id */ Aws::String(), + Aws::Auth::DEFAULT_CREDS_LOAD_FREQ_SECONDS, sts_client); + } + // 4. AWS Profile (reads from ~/.aws/credentials) + if (!_config.profile_name.empty()) { + LOG(INFO) << "Using AWS Profile: " << _config.profile_name; + + return std::make_shared<Aws::Auth::ProfileConfigFileAWSCredentialsProvider>( + _config.profile_name.c_str()); + } + // 5. Custom Credentials Provider + if (!_config.credentials_provider.empty()) { + LOG(INFO) << "Using custom credentials provider: " << _config.credentials_provider; + + // Parse credentials provider type string + std::string provider_upper = _config.credentials_provider; + std::transform(provider_upper.begin(), provider_upper.end(), provider_upper.begin(), + ::toupper); + + if (provider_upper == "ENV" || provider_upper == "ENVIRONMENT") { + return std::make_shared<Aws::Auth::EnvironmentAWSCredentialsProvider>(); + } else if (provider_upper == "INSTANCE_PROFILE" || provider_upper == "INSTANCEPROFILE") { + return std::make_shared<Aws::Auth::InstanceProfileCredentialsProvider>(); + } else if (provider_upper == "CONTAINER" || provider_upper == "ECS") { + return std::make_shared<Aws::Auth::TaskRoleCredentialsProvider>( + Aws::Environment::GetEnv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI").c_str()); + } else if (provider_upper == "DEFAULT") { + return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(); + } else { + LOG(WARNING) << "Unknown credentials provider type: " << _config.credentials_provider + << ", falling back to default credentials provider chain"; + return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>(); + } + } + // No valid credentials configuration found + LOG(ERROR) << "AWS MSK IAM authentication requires credentials. Please provide."; + return nullptr; +} + +Status AwsMskIamAuth::get_credentials(Aws::Auth::AWSCredentials* credentials) { + std::lock_guard<std::mutex> lock(_mutex); + + // Refresh if needed + if (_should_refresh_credentials()) { + _cached_credentials = _credentials_provider->GetAWSCredentials(); + + if (_cached_credentials.GetAWSAccessKeyId().empty()) { + return Status::InternalError("Failed to get AWS credentials"); + } + + // Set expiry time (assume 1 hour for instance profile, or use the credentials expiration) + _credentials_expiry = std::chrono::system_clock::now() + std::chrono::hours(1); + + LOG(INFO) << "Refreshed AWS credentials for MSK IAM authentication"; + } + + *credentials = _cached_credentials; + return Status::OK(); +} + +bool AwsMskIamAuth::_should_refresh_credentials() { + auto now = std::chrono::system_clock::now(); + auto refresh_time = + _credentials_expiry - std::chrono::milliseconds(_config.token_refresh_margin_ms); + return now >= refresh_time || _cached_credentials.GetAWSAccessKeyId().empty(); +} + +Status AwsMskIamAuth::generate_token(const std::string& broker_hostname, std::string* token, + int64_t* token_lifetime_ms) { + Aws::Auth::AWSCredentials credentials; + RETURN_IF_ERROR(get_credentials(&credentials)); + + std::string timestamp = _get_timestamp(); + std::string date_stamp = _get_date_stamp(timestamp); + + // AWS MSK IAM token is a base64-encoded presigned URL + // Reference: https://github.com/aws/aws-msk-iam-sasl-signer-python + + // Token expiry in seconds (900 seconds = 15 minutes, matching AWS MSK IAM signer reference) + static constexpr int TOKEN_EXPIRY_SECONDS = 900; + + // Build the endpoint URL + std::string endpoint_url = "https://kafka." + _config.region + ".amazonaws.com/"; + + // Build credential scope + std::string credential_scope = + date_stamp + "/" + _config.region + "/kafka-cluster/aws4_request"; + + // Build the canonical query string (sorted alphabetically) + // IMPORTANT: All query parameters must be included in the signature calculation + // Session Token must be in canonical query string if using temporary credentials + std::stringstream canonical_query_ss; + canonical_query_ss << "Action=kafka-cluster%3AConnect"; // URL-encoded : + + // Add Algorithm + canonical_query_ss << "&X-Amz-Algorithm=AWS4-HMAC-SHA256"; + + // Add Credential + std::string credential = std::string(credentials.GetAWSAccessKeyId()) + "/" + credential_scope; + canonical_query_ss << "&X-Amz-Credential=" << _url_encode(credential); + + // Add Date + canonical_query_ss << "&X-Amz-Date=" << timestamp; + + // Add Expires + canonical_query_ss << "&X-Amz-Expires=" << TOKEN_EXPIRY_SECONDS; + + // Add Security Token if present (MUST be before signature calculation) + if (!credentials.GetSessionToken().empty()) { + canonical_query_ss << "&X-Amz-Security-Token=" + << _url_encode(std::string(credentials.GetSessionToken())); + } + + // Add SignedHeaders + canonical_query_ss << "&X-Amz-SignedHeaders=host"; + + std::string canonical_query_string = canonical_query_ss.str(); + + // Build the canonical headers + std::string host = "kafka." + _config.region + ".amazonaws.com"; + std::string canonical_headers = "host:" + host + "\n"; + std::string signed_headers = "host"; + + // Build the canonical request + std::string method = "GET"; + std::string uri = "/"; + std::string payload_hash = _sha256(""); + + std::string canonical_request = method + "\n" + uri + "\n" + canonical_query_string + "\n" + + canonical_headers + "\n" + signed_headers + "\n" + payload_hash; + + // Build the string to sign + std::string algorithm = "AWS4-HMAC-SHA256"; + std::string canonical_request_hash = _sha256(canonical_request); + std::string string_to_sign = + algorithm + "\n" + timestamp + "\n" + credential_scope + "\n" + canonical_request_hash; + + // Calculate signature + std::string signing_key = _calculate_signing_key(std::string(credentials.GetAWSSecretKey()), + date_stamp, _config.region, "kafka-cluster"); + std::string signature = _hmac_sha256_hex(signing_key, string_to_sign); + + // Build the final presigned URL + // All parameters are already in canonical_query_string, just add signature + // Then add User-Agent AFTER signature (not part of signed content, matching reference impl) + std::string signed_url = endpoint_url + "?" + canonical_query_string + + "&X-Amz-Signature=" + signature + + "&User-Agent=doris-msk-iam-auth%2F1.0"; + + // Base64url encode the signed URL (without padding) + *token = _base64url_encode(signed_url); + + // Token lifetime in milliseconds + *token_lifetime_ms = TOKEN_EXPIRY_SECONDS * 1000; + + LOG(INFO) << "Generated AWS MSK IAM token, presigned URL: " << signed_url; + Review Comment: **Security Issue**: This logs the **complete presigned URL** at INFO level. The URL contains: - The full AWS Access Key ID (in `X-Amz-Credential`) - The full session token (in `X-Amz-Security-Token`) if using temporary credentials - The computed signature (in `X-Amz-Signature`) The presigned URL is a **bearer credential** -- anyone who possesses it can authenticate to the MSK cluster for 900 seconds. Logging it in production logs creates a credential leak vector. **Fix**: Either remove this log entirely or use `VLOG_DEBUG` and redact the sensitive query parameters: ```cpp VLOG_DEBUG << "Generated AWS MSK IAM token for region: " << _config.region; ``` ########## be/src/runtime/routine_load/data_consumer.cpp: ########## @@ -99,6 +100,15 @@ Status KafkaDataConsumer::init(std::shared_ptr<StreamLoadContext> ctx) { } Review Comment: **Bug (Critical)**: This check will **never match**. It tests whether the property **value** (`item.second`) starts with `"AWS:"`, but AWS property values are normal strings like `"us-east-1"`, `"AKIAIOSFODNN7EXAMPLE"`, etc. None of them start with `"AWS:"`. The correct fix is to check the property **key** (`item.first`) for the `"aws."` prefix: ```cpp if (starts_with(item.first, "aws.")) { LOG(INFO) << "Skipping AWS property for librdkafka: " << item.first; continue; } ``` Compare with the `starts_with(item.second, "FILE:")` check a few lines below -- that one is correct because FE explicitly encodes values as `"FILE:file_id:md5"`. There is no analogous `"AWS:"` value encoding. **Impact**: Without this fix, AWS properties (`aws.region`, `aws.access.key`, `aws.secret.key`, etc.) will be passed to librdkafka via `set_conf()`, which will reject them as unknown configuration properties, causing the routine load to fail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
