Copilot commented on code in PR #12305:
URL: https://github.com/apache/gluten/pull/12305#discussion_r3508735122
##########
cpp/velox/filesystem/GlutenS3FileSystem.cc:
##########
@@ -29,6 +66,460 @@ namespace filesystems = facebook::velox::filesystems;
namespace {
+using namespace facebook::velox::filesystems;
+
+constexpr std::string_view kPartUploadAsync{"part-upload-async"};
+constexpr std::string_view kPartUploadAsyncLegacy{"uploadPartAsync"};
+constexpr std::string_view
kMaxConcurrentUploadNum{"max-concurrent-upload-num"};
+constexpr std::string_view kUploadThreads{"upload-threads"};
+constexpr uint32_t kDefaultMaxConcurrentUploadNum{4};
+constexpr uint32_t kDefaultUploadThreads{16};
+constexpr const char* kApplicationOctetStream = "application/octet-stream";
+
+std::string baseS3ConfigKey(std::string_view suffix) {
+ std::string key(filesystems::S3Config::kS3Prefix);
+ key.append(suffix);
+ return key;
+}
+
+std::string bucketS3ConfigKey(std::string_view bucketName, std::string_view
suffix) {
+ std::string key(filesystems::S3Config::kS3BucketPrefix);
+ key.append(bucketName);
+ key.append(".");
+ key.append(suffix);
+ return key;
+}
+
+std::optional<std::string> getS3ConfigValue(
+ const std::shared_ptr<const velox::config::ConfigBase>& config,
+ std::string_view bucketName,
+ std::string_view suffix) {
+ if (auto value = config->get<std::string>(bucketS3ConfigKey(bucketName,
suffix))) {
+ return value;
+ }
+ return config->get<std::string>(baseS3ConfigKey(suffix));
+}
+
+std::optional<std::string> getS3ConfigValue(
+ const std::shared_ptr<const velox::config::ConfigBase>& config,
+ std::string_view bucketName,
+ std::string_view suffix,
+ std::string_view legacySuffix) {
+ auto value = getS3ConfigValue(config, bucketName, suffix);
+ if (value.has_value()) {
+ return value;
+ }
+ return getS3ConfigValue(config, bucketName, legacySuffix);
+}
+
+uint32_t getUInt32S3Config(
+ const std::shared_ptr<const velox::config::ConfigBase>& config,
+ std::string_view bucketName,
+ std::string_view suffix,
+ uint32_t defaultValue) {
+ const auto value = getS3ConfigValue(config, bucketName, suffix);
+ return value.has_value() ? folly::to<uint32_t>(value.value()) : defaultValue;
+}
+
+// Supported values are "Always", "RequestDependent", "Never"(default).
+Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy
inferPayloadSign(std::string sign) {
+ std::transform(sign.begin(), sign.end(), sign.begin(), [](unsigned char c) {
return std::toupper(c); });
+ if (sign == "ALWAYS") {
+ return Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Always;
+ }
+ if (sign == "REQUESTDEPENDENT") {
+ return
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::RequestDependent;
+ }
+ return Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never;
+}
+
+std::optional<std::shared_ptr<Aws::Client::RetryStrategy>> getRetryStrategy(
+ const std::shared_ptr<filesystems::S3Config>& s3Config) {
+ auto retryMode = s3Config->retryMode();
+ auto maxAttempts = s3Config->maxAttempts();
+ if (!retryMode.has_value()) {
+ return std::nullopt;
+ }
+
+ if (retryMode.value() == "standard") {
+ if (maxAttempts.has_value()) {
+ VELOX_USER_CHECK_GE(
+ maxAttempts.value(),
+ 0,
+ "Invalid configuration: specified 'hive.s3.max-attempts' value {} is
< 0.",
+ maxAttempts.value());
+ return
std::make_shared<Aws::Client::StandardRetryStrategy>(maxAttempts.value());
+ }
+ return std::make_shared<Aws::Client::StandardRetryStrategy>();
+ }
+
+ if (retryMode.value() == "adaptive") {
+ if (maxAttempts.has_value()) {
+ VELOX_USER_CHECK_GE(
+ maxAttempts.value(),
+ 0,
+ "Invalid configuration: specified 'hive.s3.max-attempts' value {} is
< 0.",
+ maxAttempts.value());
+ return
std::make_shared<Aws::Client::AdaptiveRetryStrategy>(maxAttempts.value());
+ }
+ return std::make_shared<Aws::Client::AdaptiveRetryStrategy>();
+ }
+
+ if (retryMode.value() == "legacy") {
+ if (maxAttempts.has_value()) {
+ VELOX_USER_CHECK_GE(
+ maxAttempts.value(),
+ 0,
+ "Invalid configuration: specified 'hive.s3.max-attempts' value {} is
< 0.",
+ maxAttempts.value());
+ return
std::make_shared<Aws::Client::DefaultRetryStrategy>(maxAttempts.value());
+ }
+ return std::make_shared<Aws::Client::DefaultRetryStrategy>();
+ }
+
+ VELOX_USER_FAIL("Invalid retry mode for S3: {}", retryMode.value());
+ return std::nullopt;
+}
+
+std::shared_ptr<Aws::Auth::AWSCredentialsProvider>
getCredentialsProvider(const filesystems::S3Config& s3Config) {
+ VELOX_USER_CHECK(
+ !s3Config.credentialsProvider().has_value(),
+ "Gluten async S3 multipart upload does not support custom AWS
credentials providers yet.");
+
+ auto accessKey = s3Config.accessKey();
+ auto secretKey = s3Config.secretKey();
+ const auto iamRole = s3Config.iamRole();
+
+ int keyCount = accessKey.has_value() + secretKey.has_value();
+ VELOX_USER_CHECK(keyCount != 1, "Invalid configuration: both access key and
secret key must be specified");
+
+ int configCount =
+ (accessKey.has_value() && secretKey.has_value()) + iamRole.has_value() +
s3Config.useInstanceCredentials();
+ VELOX_USER_CHECK(
+ configCount <= 1,
+ "Invalid configuration: specify only one among 'access/secret keys',
'use instance credentials', 'IAM role'");
+
+ if (accessKey.has_value() && secretKey.has_value()) {
+ return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(
+ filesystems::awsString(accessKey.value()),
filesystems::awsString(secretKey.value()));
+ }
+
+ if (s3Config.useInstanceCredentials()) {
+ return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+ }
+
+ if (iamRole.has_value()) {
+ return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+ filesystems::awsString(iamRole.value()),
filesystems::awsString(s3Config.iamRoleSessionName()));
+ }
+
+ return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+}
+
+std::shared_ptr<Aws::S3::S3Client> createWriteClient(const
std::shared_ptr<filesystems::S3Config>& s3Config) {
+ Aws::Client::ClientConfigurationInitValues initValues;
+ initValues.shouldDisableIMDS = !s3Config->useIMDS();
+ Aws::S3::S3ClientConfiguration clientConfig(initValues);
+ clientConfig.checksumConfig.requestChecksumCalculation =
Aws::Client::RequestChecksumCalculation::WHEN_REQUIRED;
+ clientConfig.checksumConfig.responseChecksumValidation =
Aws::Client::ResponseChecksumValidation::WHEN_REQUIRED;
+
+ if (s3Config->endpoint().has_value()) {
+ clientConfig.endpointOverride = s3Config->endpoint().value();
+ }
+ if (s3Config->endpointRegion().has_value()) {
+ clientConfig.region = s3Config->endpointRegion().value();
+ }
+ if (s3Config->useProxyFromEnv()) {
+ auto proxyConfig =
+
filesystems::S3ProxyConfigurationBuilder(s3Config->endpoint().has_value() ?
s3Config->endpoint().value() : "")
+ .useSsl(s3Config->useSSL())
+ .build();
+ if (proxyConfig.has_value()) {
+ clientConfig.proxyScheme =
Aws::Http::SchemeMapper::FromString(proxyConfig.value().scheme().c_str());
+ clientConfig.proxyHost =
filesystems::awsString(proxyConfig.value().host());
+ clientConfig.proxyPort = proxyConfig.value().port();
+ clientConfig.proxyUserName =
filesystems::awsString(proxyConfig.value().username());
+ clientConfig.proxyPassword =
filesystems::awsString(proxyConfig.value().password());
+ }
+ }
+
+ clientConfig.scheme = s3Config->useSSL() ? Aws::Http::Scheme::HTTPS :
Aws::Http::Scheme::HTTP;
+
+ if (s3Config->connectTimeout().has_value()) {
+ clientConfig.connectTimeoutMs =
std::chrono::duration_cast<std::chrono::milliseconds>(
+
velox::config::toDuration(s3Config->connectTimeout().value()))
+ .count();
+ }
+ if (s3Config->socketTimeout().has_value()) {
+ clientConfig.requestTimeoutMs =
std::chrono::duration_cast<std::chrono::milliseconds>(
+
velox::config::toDuration(s3Config->socketTimeout().value()))
+ .count();
+ }
+ if (s3Config->maxConnections().has_value()) {
+ clientConfig.maxConnections = s3Config->maxConnections().value();
+ }
+
+ auto retryStrategy = getRetryStrategy(s3Config);
+ if (retryStrategy.has_value()) {
+ clientConfig.retryStrategy = retryStrategy.value();
+ }
+
+ clientConfig.useVirtualAddressing = s3Config->useVirtualAddressing();
+ clientConfig.payloadSigningPolicy =
inferPayloadSign(s3Config->payloadSigningPolicy());
+
+ return std::make_shared<Aws::S3::S3Client>(
+ getCredentialsProvider(*s3Config), nullptr /* endpointProvider */,
clientConfig);
+}
+
+class GlutenS3WriteFile : public velox::WriteFile {
+ public:
+ GlutenS3WriteFile(
+ std::string_view path,
+ Aws::S3::S3Client* client,
+ velox::memory::MemoryPool* pool,
+ size_t minPartSize,
+ uint32_t maxConcurrentUploadNum,
+ uint32_t uploadThreads)
+ : client_(client),
+ pool_(pool),
+ minPartSize_(minPartSize),
+
uploadThrottle_(std::make_unique<folly::ThrottledLifoSem>(maxConcurrentUploadNum)),
+ uploadThreadPool_(uploadThreadPool(uploadThreads)) {
+ VELOX_CHECK_NOT_NULL(client_);
+ VELOX_CHECK_NOT_NULL(pool_);
+ filesystems::getBucketAndKeyFromPath(path, bucket_, key_);
+ currentPart_ =
std::make_unique<velox::dwio::common::DataBuffer<char>>(*pool_);
+ currentPart_->reserve(minPartSize_);
+ ensureObjectDoesNotExist();
+ createBucketIfMissing();
+ fileSize_ = 0;
+ }
+
+ ~GlutenS3WriteFile() override {
+ if (!uploadFutures_.empty()) {
+ try {
+ waitForAsyncUploads();
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Failed while waiting for S3 async uploads: " <<
e.what();
+ }
+ }
+ }
+
+ void append(std::string_view data) override {
+ VELOX_CHECK(!closed(), "File is closed");
+ if (data.size() + currentPart_->size() >= minPartSize_) {
+ if (uploadState_.partNumber == 0) {
+ createMultipartUploadRequest();
+ }
+ upload(data);
+ } else {
+ currentPart_->unsafeAppend(data.data(), data.size());
+ }
+ fileSize_ += data.size();
+ }
+
+ void flush() override {
+ VELOX_CHECK(!closed(), "File is closed");
+ VELOX_CHECK_LT(currentPart_->size(), minPartSize_);
+ }
+
+ void close() override {
+ if (closed()) {
+ return;
+ }
+ if (uploadState_.partNumber == 0) {
+ putObjectRequest();
+ currentPart_->clear();
+ return;
+ }
+
+ RECORD_METRIC_VALUE(filesystems::kMetricS3StartedUploads);
+ uploadPart({currentPart_->data(), currentPart_->size()}, true);
+ waitForAsyncUploads();
+ VELOX_CHECK_EQ(uploadState_.partNumber,
uploadState_.completedParts.size());
+ completeMultipartUpload();
+ currentPart_->clear();
+ }
+
+ uint64_t size() const override {
+ return fileSize_;
+ }
+
+ private:
+ struct UploadState {
+ Aws::Vector<Aws::S3::Model::CompletedPart> completedParts;
+ int64_t partNumber = 0;
+ Aws::String id;
+ };
+
+ static std::shared_ptr<folly::CPUThreadPoolExecutor>
uploadThreadPool(uint32_t uploadThreads) {
+ std::lock_guard<std::mutex> l(uploadThreadPoolMutex_);
+ if (!sharedUploadThreadPool_) {
+ sharedUploadThreadPool_ = std::make_shared<folly::CPUThreadPoolExecutor>(
+ uploadThreads,
std::make_shared<folly::NamedThreadFactory>("s3-upload-thread"));
+ }
+ return sharedUploadThreadPool_;
+ }
+
+ bool closed() const {
+ return currentPart_->capacity() == 0;
+ }
+
+ void ensureObjectDoesNotExist() {
+ Aws::S3::Model::HeadObjectRequest request;
+ request.SetBucket(filesystems::awsString(bucket_));
+ request.SetKey(filesystems::awsString(key_));
+ RECORD_METRIC_VALUE(filesystems::kMetricS3MetadataCalls);
+ auto objectMetadata = client_->HeadObject(request);
+ if (!objectMetadata.IsSuccess()) {
+ RECORD_METRIC_VALUE(filesystems::kMetricS3GetMetadataErrors);
+ }
+ RECORD_METRIC_VALUE(filesystems::kMetricS3GetObjectRetries,
objectMetadata.GetRetryCount());
+ VELOX_CHECK(!objectMetadata.IsSuccess(), "S3 object already exists:
bucket={}, key={}", bucket_, key_);
+ }
+
+ void createBucketIfMissing() {
+ Aws::S3::Model::HeadBucketRequest request;
+ request.SetBucket(filesystems::awsString(bucket_));
+ auto bucketMetadata = client_->HeadBucket(request);
+ if (bucketMetadata.IsSuccess()) {
+ return;
+ }
+
+ Aws::S3::Model::CreateBucketRequest createRequest;
+ createRequest.SetBucket(filesystems::awsString(bucket_));
+ auto outcome = client_->CreateBucket(createRequest);
+ VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to create S3 bucket", bucket_,
"");
+ }
+
+ void createMultipartUploadRequest() {
+ Aws::S3::Model::CreateMultipartUploadRequest request;
+ request.SetBucket(filesystems::awsString(bucket_));
+ request.SetKey(filesystems::awsString(key_));
+ request.SetContentType(kApplicationOctetStream);
+ auto outcome = client_->CreateMultipartUpload(request);
+ VELOX_CHECK_AWS_OUTCOME(outcome, "Failed initiating multiple part upload",
bucket_, key_);
+ uploadState_.id = outcome.GetResult().GetUploadId();
+ }
+
+ void putObjectRequest() {
+ Aws::S3::Model::PutObjectRequest request;
+ request.SetBucket(filesystems::awsString(bucket_));
+ request.SetKey(filesystems::awsString(key_));
+ request.SetContentType(kApplicationOctetStream);
+ request.SetContentLength(currentPart_->size());
+
request.SetBody(std::make_shared<filesystems::StringViewStream>(currentPart_->data(),
currentPart_->size()));
+ RECORD_METRIC_VALUE(filesystems::kMetricS3StartedUploads);
+ auto outcome = client_->PutObject(request);
+ if (outcome.IsSuccess()) {
+ RECORD_METRIC_VALUE(filesystems::kMetricS3SuccessfulUploads);
+ } else {
+ RECORD_METRIC_VALUE(filesystems::kMetricS3FailedUploads);
+ }
+ VELOX_CHECK_AWS_OUTCOME(outcome, "Failed single object upload", bucket_,
key_);
+ }
+
+ void completeMultipartUpload() {
+ Aws::S3::Model::CompletedMultipartUpload completedUpload;
+ completedUpload.SetParts(uploadState_.completedParts);
+ Aws::S3::Model::CompleteMultipartUploadRequest request;
+ request.SetBucket(filesystems::awsString(bucket_));
+ request.SetKey(filesystems::awsString(key_));
+ request.SetUploadId(uploadState_.id);
+ request.SetMultipartUpload(std::move(completedUpload));
+
+ auto outcome = client_->CompleteMultipartUpload(request);
+ if (outcome.IsSuccess()) {
+ RECORD_METRIC_VALUE(filesystems::kMetricS3SuccessfulUploads);
+ } else {
+ RECORD_METRIC_VALUE(filesystems::kMetricS3FailedUploads);
+ }
+ VELOX_CHECK_AWS_OUTCOME(outcome, "Failed to complete multiple part
upload", bucket_, key_);
+ }
+
+ void upload(std::string_view data) {
+ auto dataPtr = data.data();
+ auto dataSize = data.size();
+ auto remainingBufferSize = currentPart_->capacity() - currentPart_->size();
+ currentPart_->unsafeAppend(dataPtr, remainingBufferSize);
+ uploadPart({currentPart_->data(), currentPart_->size()});
+ dataPtr += remainingBufferSize;
+ dataSize -= remainingBufferSize;
+ while (dataSize > minPartSize_) {
+ uploadPart({dataPtr, minPartSize_});
+ dataPtr += minPartSize_;
+ dataSize -= minPartSize_;
+ }
+ currentPart_->unsafeAppend(0, dataPtr, dataSize);
+ }
Review Comment:
In upload(), the buffer holding the current part is uploaded via
uploadPart(...) but never reset before more data is appended. This can
re-upload the same bytes and/or append past the reserved capacity, corrupting
the uploaded object.
##########
cpp/velox/filesystem/GlutenS3FileSystem.cc:
##########
@@ -29,6 +66,460 @@ namespace filesystems = facebook::velox::filesystems;
namespace {
+using namespace facebook::velox::filesystems;
+
+constexpr std::string_view kPartUploadAsync{"part-upload-async"};
+constexpr std::string_view kPartUploadAsyncLegacy{"uploadPartAsync"};
+constexpr std::string_view
kMaxConcurrentUploadNum{"max-concurrent-upload-num"};
+constexpr std::string_view kUploadThreads{"upload-threads"};
+constexpr uint32_t kDefaultMaxConcurrentUploadNum{4};
+constexpr uint32_t kDefaultUploadThreads{16};
+constexpr const char* kApplicationOctetStream = "application/octet-stream";
+
+std::string baseS3ConfigKey(std::string_view suffix) {
+ std::string key(filesystems::S3Config::kS3Prefix);
+ key.append(suffix);
+ return key;
+}
+
+std::string bucketS3ConfigKey(std::string_view bucketName, std::string_view
suffix) {
+ std::string key(filesystems::S3Config::kS3BucketPrefix);
+ key.append(bucketName);
+ key.append(".");
+ key.append(suffix);
+ return key;
+}
+
+std::optional<std::string> getS3ConfigValue(
+ const std::shared_ptr<const velox::config::ConfigBase>& config,
+ std::string_view bucketName,
+ std::string_view suffix) {
+ if (auto value = config->get<std::string>(bucketS3ConfigKey(bucketName,
suffix))) {
+ return value;
+ }
+ return config->get<std::string>(baseS3ConfigKey(suffix));
+}
+
+std::optional<std::string> getS3ConfigValue(
+ const std::shared_ptr<const velox::config::ConfigBase>& config,
+ std::string_view bucketName,
+ std::string_view suffix,
+ std::string_view legacySuffix) {
+ auto value = getS3ConfigValue(config, bucketName, suffix);
+ if (value.has_value()) {
+ return value;
+ }
+ return getS3ConfigValue(config, bucketName, legacySuffix);
+}
+
+uint32_t getUInt32S3Config(
+ const std::shared_ptr<const velox::config::ConfigBase>& config,
+ std::string_view bucketName,
+ std::string_view suffix,
+ uint32_t defaultValue) {
+ const auto value = getS3ConfigValue(config, bucketName, suffix);
+ return value.has_value() ? folly::to<uint32_t>(value.value()) : defaultValue;
+}
+
+// Supported values are "Always", "RequestDependent", "Never"(default).
+Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy
inferPayloadSign(std::string sign) {
+ std::transform(sign.begin(), sign.end(), sign.begin(), [](unsigned char c) {
return std::toupper(c); });
+ if (sign == "ALWAYS") {
+ return Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Always;
+ }
+ if (sign == "REQUESTDEPENDENT") {
+ return
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::RequestDependent;
+ }
+ return Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never;
+}
+
+std::optional<std::shared_ptr<Aws::Client::RetryStrategy>> getRetryStrategy(
+ const std::shared_ptr<filesystems::S3Config>& s3Config) {
+ auto retryMode = s3Config->retryMode();
+ auto maxAttempts = s3Config->maxAttempts();
+ if (!retryMode.has_value()) {
+ return std::nullopt;
+ }
+
+ if (retryMode.value() == "standard") {
+ if (maxAttempts.has_value()) {
+ VELOX_USER_CHECK_GE(
+ maxAttempts.value(),
+ 0,
+ "Invalid configuration: specified 'hive.s3.max-attempts' value {} is
< 0.",
+ maxAttempts.value());
+ return
std::make_shared<Aws::Client::StandardRetryStrategy>(maxAttempts.value());
+ }
+ return std::make_shared<Aws::Client::StandardRetryStrategy>();
+ }
+
+ if (retryMode.value() == "adaptive") {
+ if (maxAttempts.has_value()) {
+ VELOX_USER_CHECK_GE(
+ maxAttempts.value(),
+ 0,
+ "Invalid configuration: specified 'hive.s3.max-attempts' value {} is
< 0.",
+ maxAttempts.value());
+ return
std::make_shared<Aws::Client::AdaptiveRetryStrategy>(maxAttempts.value());
+ }
+ return std::make_shared<Aws::Client::AdaptiveRetryStrategy>();
+ }
+
+ if (retryMode.value() == "legacy") {
+ if (maxAttempts.has_value()) {
+ VELOX_USER_CHECK_GE(
+ maxAttempts.value(),
+ 0,
+ "Invalid configuration: specified 'hive.s3.max-attempts' value {} is
< 0.",
+ maxAttempts.value());
+ return
std::make_shared<Aws::Client::DefaultRetryStrategy>(maxAttempts.value());
+ }
+ return std::make_shared<Aws::Client::DefaultRetryStrategy>();
+ }
+
+ VELOX_USER_FAIL("Invalid retry mode for S3: {}", retryMode.value());
+ return std::nullopt;
+}
+
+std::shared_ptr<Aws::Auth::AWSCredentialsProvider>
getCredentialsProvider(const filesystems::S3Config& s3Config) {
+ VELOX_USER_CHECK(
+ !s3Config.credentialsProvider().has_value(),
+ "Gluten async S3 multipart upload does not support custom AWS
credentials providers yet.");
+
+ auto accessKey = s3Config.accessKey();
+ auto secretKey = s3Config.secretKey();
+ const auto iamRole = s3Config.iamRole();
+
+ int keyCount = accessKey.has_value() + secretKey.has_value();
+ VELOX_USER_CHECK(keyCount != 1, "Invalid configuration: both access key and
secret key must be specified");
+
+ int configCount =
+ (accessKey.has_value() && secretKey.has_value()) + iamRole.has_value() +
s3Config.useInstanceCredentials();
+ VELOX_USER_CHECK(
+ configCount <= 1,
+ "Invalid configuration: specify only one among 'access/secret keys',
'use instance credentials', 'IAM role'");
+
+ if (accessKey.has_value() && secretKey.has_value()) {
+ return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(
+ filesystems::awsString(accessKey.value()),
filesystems::awsString(secretKey.value()));
+ }
+
+ if (s3Config.useInstanceCredentials()) {
+ return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+ }
+
+ if (iamRole.has_value()) {
+ return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+ filesystems::awsString(iamRole.value()),
filesystems::awsString(s3Config.iamRoleSessionName()));
+ }
+
+ return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+}
+
+std::shared_ptr<Aws::S3::S3Client> createWriteClient(const
std::shared_ptr<filesystems::S3Config>& s3Config) {
+ Aws::Client::ClientConfigurationInitValues initValues;
+ initValues.shouldDisableIMDS = !s3Config->useIMDS();
+ Aws::S3::S3ClientConfiguration clientConfig(initValues);
+ clientConfig.checksumConfig.requestChecksumCalculation =
Aws::Client::RequestChecksumCalculation::WHEN_REQUIRED;
+ clientConfig.checksumConfig.responseChecksumValidation =
Aws::Client::ResponseChecksumValidation::WHEN_REQUIRED;
+
+ if (s3Config->endpoint().has_value()) {
+ clientConfig.endpointOverride = s3Config->endpoint().value();
+ }
+ if (s3Config->endpointRegion().has_value()) {
+ clientConfig.region = s3Config->endpointRegion().value();
+ }
+ if (s3Config->useProxyFromEnv()) {
+ auto proxyConfig =
+
filesystems::S3ProxyConfigurationBuilder(s3Config->endpoint().has_value() ?
s3Config->endpoint().value() : "")
+ .useSsl(s3Config->useSSL())
+ .build();
+ if (proxyConfig.has_value()) {
+ clientConfig.proxyScheme =
Aws::Http::SchemeMapper::FromString(proxyConfig.value().scheme().c_str());
+ clientConfig.proxyHost =
filesystems::awsString(proxyConfig.value().host());
+ clientConfig.proxyPort = proxyConfig.value().port();
+ clientConfig.proxyUserName =
filesystems::awsString(proxyConfig.value().username());
+ clientConfig.proxyPassword =
filesystems::awsString(proxyConfig.value().password());
+ }
+ }
+
+ clientConfig.scheme = s3Config->useSSL() ? Aws::Http::Scheme::HTTPS :
Aws::Http::Scheme::HTTP;
+
+ if (s3Config->connectTimeout().has_value()) {
+ clientConfig.connectTimeoutMs =
std::chrono::duration_cast<std::chrono::milliseconds>(
+
velox::config::toDuration(s3Config->connectTimeout().value()))
+ .count();
+ }
+ if (s3Config->socketTimeout().has_value()) {
+ clientConfig.requestTimeoutMs =
std::chrono::duration_cast<std::chrono::milliseconds>(
+
velox::config::toDuration(s3Config->socketTimeout().value()))
+ .count();
+ }
+ if (s3Config->maxConnections().has_value()) {
+ clientConfig.maxConnections = s3Config->maxConnections().value();
+ }
+
+ auto retryStrategy = getRetryStrategy(s3Config);
+ if (retryStrategy.has_value()) {
+ clientConfig.retryStrategy = retryStrategy.value();
+ }
+
+ clientConfig.useVirtualAddressing = s3Config->useVirtualAddressing();
+ clientConfig.payloadSigningPolicy =
inferPayloadSign(s3Config->payloadSigningPolicy());
+
+ return std::make_shared<Aws::S3::S3Client>(
+ getCredentialsProvider(*s3Config), nullptr /* endpointProvider */,
clientConfig);
+}
+
+class GlutenS3WriteFile : public velox::WriteFile {
+ public:
+ GlutenS3WriteFile(
+ std::string_view path,
+ Aws::S3::S3Client* client,
+ velox::memory::MemoryPool* pool,
+ size_t minPartSize,
+ uint32_t maxConcurrentUploadNum,
+ uint32_t uploadThreads)
+ : client_(client),
+ pool_(pool),
+ minPartSize_(minPartSize),
+
uploadThrottle_(std::make_unique<folly::ThrottledLifoSem>(maxConcurrentUploadNum)),
+ uploadThreadPool_(uploadThreadPool(uploadThreads)) {
+ VELOX_CHECK_NOT_NULL(client_);
+ VELOX_CHECK_NOT_NULL(pool_);
+ filesystems::getBucketAndKeyFromPath(path, bucket_, key_);
+ currentPart_ =
std::make_unique<velox::dwio::common::DataBuffer<char>>(*pool_);
+ currentPart_->reserve(minPartSize_);
+ ensureObjectDoesNotExist();
+ createBucketIfMissing();
+ fileSize_ = 0;
+ }
+
+ ~GlutenS3WriteFile() override {
+ if (!uploadFutures_.empty()) {
+ try {
+ waitForAsyncUploads();
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Failed while waiting for S3 async uploads: " <<
e.what();
+ }
+ }
+ }
+
+ void append(std::string_view data) override {
+ VELOX_CHECK(!closed(), "File is closed");
+ if (data.size() + currentPart_->size() >= minPartSize_) {
+ if (uploadState_.partNumber == 0) {
+ createMultipartUploadRequest();
+ }
+ upload(data);
+ } else {
+ currentPart_->unsafeAppend(data.data(), data.size());
+ }
+ fileSize_ += data.size();
+ }
+
+ void flush() override {
+ VELOX_CHECK(!closed(), "File is closed");
+ VELOX_CHECK_LT(currentPart_->size(), minPartSize_);
+ }
+
+ void close() override {
+ if (closed()) {
+ return;
+ }
+ if (uploadState_.partNumber == 0) {
+ putObjectRequest();
+ currentPart_->clear();
+ return;
+ }
+
+ RECORD_METRIC_VALUE(filesystems::kMetricS3StartedUploads);
+ uploadPart({currentPart_->data(), currentPart_->size()}, true);
+ waitForAsyncUploads();
+ VELOX_CHECK_EQ(uploadState_.partNumber,
uploadState_.completedParts.size());
+ completeMultipartUpload();
Review Comment:
If any async part upload or CompleteMultipartUpload fails, the code doesn’t
appear to abort the multipart upload. This can leave incomplete multipart
uploads behind (storage leakage / cleanup burden), and the destructor path only
logs and swallows exceptions rather than cleaning up.
##########
cpp/velox/filesystem/GlutenS3FileSystem.cc:
##########
@@ -29,6 +66,460 @@ namespace filesystems = facebook::velox::filesystems;
namespace {
+using namespace facebook::velox::filesystems;
+
+constexpr std::string_view kPartUploadAsync{"part-upload-async"};
+constexpr std::string_view kPartUploadAsyncLegacy{"uploadPartAsync"};
+constexpr std::string_view
kMaxConcurrentUploadNum{"max-concurrent-upload-num"};
+constexpr std::string_view kUploadThreads{"upload-threads"};
+constexpr uint32_t kDefaultMaxConcurrentUploadNum{4};
+constexpr uint32_t kDefaultUploadThreads{16};
+constexpr const char* kApplicationOctetStream = "application/octet-stream";
+
+std::string baseS3ConfigKey(std::string_view suffix) {
+ std::string key(filesystems::S3Config::kS3Prefix);
+ key.append(suffix);
+ return key;
+}
+
+std::string bucketS3ConfigKey(std::string_view bucketName, std::string_view
suffix) {
+ std::string key(filesystems::S3Config::kS3BucketPrefix);
+ key.append(bucketName);
+ key.append(".");
+ key.append(suffix);
+ return key;
+}
+
+std::optional<std::string> getS3ConfigValue(
+ const std::shared_ptr<const velox::config::ConfigBase>& config,
+ std::string_view bucketName,
+ std::string_view suffix) {
+ if (auto value = config->get<std::string>(bucketS3ConfigKey(bucketName,
suffix))) {
+ return value;
+ }
+ return config->get<std::string>(baseS3ConfigKey(suffix));
+}
+
+std::optional<std::string> getS3ConfigValue(
+ const std::shared_ptr<const velox::config::ConfigBase>& config,
+ std::string_view bucketName,
+ std::string_view suffix,
+ std::string_view legacySuffix) {
+ auto value = getS3ConfigValue(config, bucketName, suffix);
+ if (value.has_value()) {
+ return value;
+ }
+ return getS3ConfigValue(config, bucketName, legacySuffix);
+}
+
+uint32_t getUInt32S3Config(
+ const std::shared_ptr<const velox::config::ConfigBase>& config,
+ std::string_view bucketName,
+ std::string_view suffix,
+ uint32_t defaultValue) {
+ const auto value = getS3ConfigValue(config, bucketName, suffix);
+ return value.has_value() ? folly::to<uint32_t>(value.value()) : defaultValue;
+}
+
+// Supported values are "Always", "RequestDependent", "Never"(default).
+Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy
inferPayloadSign(std::string sign) {
+ std::transform(sign.begin(), sign.end(), sign.begin(), [](unsigned char c) {
return std::toupper(c); });
+ if (sign == "ALWAYS") {
+ return Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Always;
+ }
+ if (sign == "REQUESTDEPENDENT") {
+ return
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::RequestDependent;
+ }
+ return Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never;
+}
+
+std::optional<std::shared_ptr<Aws::Client::RetryStrategy>> getRetryStrategy(
+ const std::shared_ptr<filesystems::S3Config>& s3Config) {
+ auto retryMode = s3Config->retryMode();
+ auto maxAttempts = s3Config->maxAttempts();
+ if (!retryMode.has_value()) {
+ return std::nullopt;
+ }
+
+ if (retryMode.value() == "standard") {
+ if (maxAttempts.has_value()) {
+ VELOX_USER_CHECK_GE(
+ maxAttempts.value(),
+ 0,
+ "Invalid configuration: specified 'hive.s3.max-attempts' value {} is
< 0.",
+ maxAttempts.value());
+ return
std::make_shared<Aws::Client::StandardRetryStrategy>(maxAttempts.value());
+ }
+ return std::make_shared<Aws::Client::StandardRetryStrategy>();
+ }
+
+ if (retryMode.value() == "adaptive") {
+ if (maxAttempts.has_value()) {
+ VELOX_USER_CHECK_GE(
+ maxAttempts.value(),
+ 0,
+ "Invalid configuration: specified 'hive.s3.max-attempts' value {} is
< 0.",
+ maxAttempts.value());
+ return
std::make_shared<Aws::Client::AdaptiveRetryStrategy>(maxAttempts.value());
+ }
+ return std::make_shared<Aws::Client::AdaptiveRetryStrategy>();
+ }
+
+ if (retryMode.value() == "legacy") {
+ if (maxAttempts.has_value()) {
+ VELOX_USER_CHECK_GE(
+ maxAttempts.value(),
+ 0,
+ "Invalid configuration: specified 'hive.s3.max-attempts' value {} is
< 0.",
+ maxAttempts.value());
+ return
std::make_shared<Aws::Client::DefaultRetryStrategy>(maxAttempts.value());
+ }
+ return std::make_shared<Aws::Client::DefaultRetryStrategy>();
+ }
+
+ VELOX_USER_FAIL("Invalid retry mode for S3: {}", retryMode.value());
+ return std::nullopt;
+}
+
+std::shared_ptr<Aws::Auth::AWSCredentialsProvider>
getCredentialsProvider(const filesystems::S3Config& s3Config) {
+ VELOX_USER_CHECK(
+ !s3Config.credentialsProvider().has_value(),
+ "Gluten async S3 multipart upload does not support custom AWS
credentials providers yet.");
+
+ auto accessKey = s3Config.accessKey();
+ auto secretKey = s3Config.secretKey();
+ const auto iamRole = s3Config.iamRole();
+
+ int keyCount = accessKey.has_value() + secretKey.has_value();
+ VELOX_USER_CHECK(keyCount != 1, "Invalid configuration: both access key and
secret key must be specified");
+
+ int configCount =
+ (accessKey.has_value() && secretKey.has_value()) + iamRole.has_value() +
s3Config.useInstanceCredentials();
+ VELOX_USER_CHECK(
+ configCount <= 1,
+ "Invalid configuration: specify only one among 'access/secret keys',
'use instance credentials', 'IAM role'");
+
+ if (accessKey.has_value() && secretKey.has_value()) {
+ return std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(
+ filesystems::awsString(accessKey.value()),
filesystems::awsString(secretKey.value()));
+ }
+
+ if (s3Config.useInstanceCredentials()) {
+ return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+ }
+
+ if (iamRole.has_value()) {
+ return std::make_shared<Aws::Auth::STSAssumeRoleCredentialsProvider>(
+ filesystems::awsString(iamRole.value()),
filesystems::awsString(s3Config.iamRoleSessionName()));
+ }
+
+ return std::make_shared<Aws::Auth::DefaultAWSCredentialsProviderChain>();
+}
+
+std::shared_ptr<Aws::S3::S3Client> createWriteClient(const
std::shared_ptr<filesystems::S3Config>& s3Config) {
+ Aws::Client::ClientConfigurationInitValues initValues;
+ initValues.shouldDisableIMDS = !s3Config->useIMDS();
+ Aws::S3::S3ClientConfiguration clientConfig(initValues);
+ clientConfig.checksumConfig.requestChecksumCalculation =
Aws::Client::RequestChecksumCalculation::WHEN_REQUIRED;
+ clientConfig.checksumConfig.responseChecksumValidation =
Aws::Client::ResponseChecksumValidation::WHEN_REQUIRED;
+
+ if (s3Config->endpoint().has_value()) {
+ clientConfig.endpointOverride = s3Config->endpoint().value();
+ }
+ if (s3Config->endpointRegion().has_value()) {
+ clientConfig.region = s3Config->endpointRegion().value();
+ }
+ if (s3Config->useProxyFromEnv()) {
+ auto proxyConfig =
+
filesystems::S3ProxyConfigurationBuilder(s3Config->endpoint().has_value() ?
s3Config->endpoint().value() : "")
+ .useSsl(s3Config->useSSL())
+ .build();
+ if (proxyConfig.has_value()) {
+ clientConfig.proxyScheme =
Aws::Http::SchemeMapper::FromString(proxyConfig.value().scheme().c_str());
+ clientConfig.proxyHost =
filesystems::awsString(proxyConfig.value().host());
+ clientConfig.proxyPort = proxyConfig.value().port();
+ clientConfig.proxyUserName =
filesystems::awsString(proxyConfig.value().username());
+ clientConfig.proxyPassword =
filesystems::awsString(proxyConfig.value().password());
+ }
+ }
+
+ clientConfig.scheme = s3Config->useSSL() ? Aws::Http::Scheme::HTTPS :
Aws::Http::Scheme::HTTP;
+
+ if (s3Config->connectTimeout().has_value()) {
+ clientConfig.connectTimeoutMs =
std::chrono::duration_cast<std::chrono::milliseconds>(
+
velox::config::toDuration(s3Config->connectTimeout().value()))
+ .count();
+ }
+ if (s3Config->socketTimeout().has_value()) {
+ clientConfig.requestTimeoutMs =
std::chrono::duration_cast<std::chrono::milliseconds>(
+
velox::config::toDuration(s3Config->socketTimeout().value()))
+ .count();
+ }
+ if (s3Config->maxConnections().has_value()) {
+ clientConfig.maxConnections = s3Config->maxConnections().value();
+ }
+
+ auto retryStrategy = getRetryStrategy(s3Config);
+ if (retryStrategy.has_value()) {
+ clientConfig.retryStrategy = retryStrategy.value();
+ }
+
+ clientConfig.useVirtualAddressing = s3Config->useVirtualAddressing();
+ clientConfig.payloadSigningPolicy =
inferPayloadSign(s3Config->payloadSigningPolicy());
+
+ return std::make_shared<Aws::S3::S3Client>(
+ getCredentialsProvider(*s3Config), nullptr /* endpointProvider */,
clientConfig);
+}
+
+class GlutenS3WriteFile : public velox::WriteFile {
+ public:
+ GlutenS3WriteFile(
+ std::string_view path,
+ Aws::S3::S3Client* client,
+ velox::memory::MemoryPool* pool,
+ size_t minPartSize,
+ uint32_t maxConcurrentUploadNum,
+ uint32_t uploadThreads)
+ : client_(client),
+ pool_(pool),
+ minPartSize_(minPartSize),
+
uploadThrottle_(std::make_unique<folly::ThrottledLifoSem>(maxConcurrentUploadNum)),
+ uploadThreadPool_(uploadThreadPool(uploadThreads)) {
+ VELOX_CHECK_NOT_NULL(client_);
+ VELOX_CHECK_NOT_NULL(pool_);
+ filesystems::getBucketAndKeyFromPath(path, bucket_, key_);
+ currentPart_ =
std::make_unique<velox::dwio::common::DataBuffer<char>>(*pool_);
+ currentPart_->reserve(minPartSize_);
+ ensureObjectDoesNotExist();
+ createBucketIfMissing();
+ fileSize_ = 0;
+ }
+
+ ~GlutenS3WriteFile() override {
+ if (!uploadFutures_.empty()) {
+ try {
+ waitForAsyncUploads();
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Failed while waiting for S3 async uploads: " <<
e.what();
+ }
+ }
+ }
+
+ void append(std::string_view data) override {
+ VELOX_CHECK(!closed(), "File is closed");
+ if (data.size() + currentPart_->size() >= minPartSize_) {
+ if (uploadState_.partNumber == 0) {
+ createMultipartUploadRequest();
+ }
+ upload(data);
+ } else {
+ currentPart_->unsafeAppend(data.data(), data.size());
+ }
+ fileSize_ += data.size();
+ }
+
+ void flush() override {
+ VELOX_CHECK(!closed(), "File is closed");
+ VELOX_CHECK_LT(currentPart_->size(), minPartSize_);
+ }
+
+ void close() override {
+ if (closed()) {
+ return;
+ }
+ if (uploadState_.partNumber == 0) {
+ putObjectRequest();
+ currentPart_->clear();
+ return;
+ }
+
+ RECORD_METRIC_VALUE(filesystems::kMetricS3StartedUploads);
+ uploadPart({currentPart_->data(), currentPart_->size()}, true);
+ waitForAsyncUploads();
+ VELOX_CHECK_EQ(uploadState_.partNumber,
uploadState_.completedParts.size());
+ completeMultipartUpload();
+ currentPart_->clear();
+ }
+
+ uint64_t size() const override {
+ return fileSize_;
+ }
+
+ private:
+ struct UploadState {
+ Aws::Vector<Aws::S3::Model::CompletedPart> completedParts;
+ int64_t partNumber = 0;
+ Aws::String id;
+ };
+
+ static std::shared_ptr<folly::CPUThreadPoolExecutor>
uploadThreadPool(uint32_t uploadThreads) {
+ std::lock_guard<std::mutex> l(uploadThreadPoolMutex_);
+ if (!sharedUploadThreadPool_) {
+ sharedUploadThreadPool_ = std::make_shared<folly::CPUThreadPoolExecutor>(
+ uploadThreads,
std::make_shared<folly::NamedThreadFactory>("s3-upload-thread"));
Review Comment:
uploadThreads is read per bucket/file, but the implementation uses a single
static shared thread pool that is created only once. If different buckets (or
even the same app in different runs) specify different hive.s3.upload-threads
values, only the first value will take effect and the rest will be silently
ignored.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]