This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 9e6acbe08a GH-40557: [C++] Use `PutObject` request for S3 in
OutputStream when only uploading small data (#41564)
9e6acbe08a is described below
commit 9e6acbe08a0ff5b569763d377aecc824362dd593
Author: Oliver Layer <[email protected]>
AuthorDate: Mon Jul 22 13:24:36 2024 +0200
GH-40557: [C++] Use `PutObject` request for S3 in OutputStream when only
uploading small data (#41564)
### Rationale for this change
See #40557. The previous implementation would always issue multi part
uploads which come with 3x RTT to S3 instead of just 1x RTT with a `PutObject`
request.
### What changes are included in this PR?
Implement logic in the S3 `OutputStream` to use a `PutObject` request if
data is below a certain threshold (5 MB) and the output stream is closed. If
more data is written, a multi part upload is triggered. Note: Previously,
opening the output stream was already expensive because the
`CreateMultipartUpload` request was triggered then. With this change opening
the output stream becomes cheap, as we rather wait until some data is written
to decide which upload method to use. This require [...]
### Are these changes tested?
No new tests were added, as there are already tests for very small writes
and very large writes, which will trigger both ways of uploading. Everything
should therefore be covered by existing tests.
### Are there any user-facing changes?
- Previously, we would fail when opening the output stream if the bucket
doesn't exist. We inferred that by sending the `CreateMultipartUpload` request,
which we now do not send anymore upon opening the stream. We now rather fail at
closing, or at writing (when >5MB have accumulated). Replicating the old
behavior is not possible without sending another request which defeats the
purpose of this performance optimization. I hope this is fine.
* GitHub Issue: #40557
Lead-authored-by: Oliver Layer <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/filesystem/s3fs.cc | 373 +++++++++++++++++++++++++++-------
cpp/src/arrow/filesystem/s3fs.h | 10 +
cpp/src/arrow/filesystem/s3fs_test.cc | 186 ++++++++++++-----
3 files changed, 440 insertions(+), 129 deletions(-)
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 99cee19ed1..fd5b2e5be2 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -51,6 +51,7 @@
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/core/client/RetryStrategy.h>
#include <aws/core/http/HttpResponse.h>
+#include <aws/core/utils/Outcome.h>
#include <aws/core/utils/logging/ConsoleLogSystem.h>
#include <aws/core/utils/stream/PreallocatedStreamBuf.h>
#include <aws/core/utils/xml/XmlSerializer.h>
@@ -74,6 +75,7 @@
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/ObjectCannedACL.h>
#include <aws/s3/model/PutObjectRequest.h>
+#include <aws/s3/model/PutObjectResult.h>
#include <aws/s3/model/UploadPartRequest.h>
// AWS_SDK_VERSION_{MAJOR,MINOR,PATCH} are available since 1.9.7.
@@ -1335,7 +1337,7 @@ struct ObjectMetadataSetter {
static std::unordered_map<std::string, Setter> GetSetters() {
return {{"ACL", CannedACLSetter()},
{"Cache-Control", StringSetter(&ObjectRequest::SetCacheControl)},
- {"Content-Type", StringSetter(&ObjectRequest::SetContentType)},
+ {"Content-Type", ContentTypeSetter()},
{"Content-Language",
StringSetter(&ObjectRequest::SetContentLanguage)},
{"Expires", DateTimeSetter(&ObjectRequest::SetExpires)}};
}
@@ -1365,6 +1367,16 @@ struct ObjectMetadataSetter {
};
}
+ /** We need a special setter here and can not use `StringSetter` because for
e.g. the
+ * `PutObjectRequest`, the setter is located in the base class (instead of
the concrete
+ * class). */
+ static Setter ContentTypeSetter() {
+ return [](const std::string& str, ObjectRequest* req) {
+ req->SetContentType(str);
+ return Status::OK();
+ };
+ }
+
static Result<S3Model::ObjectCannedACL> ParseACL(const std::string& v) {
if (v.empty()) {
return S3Model::ObjectCannedACL::NOT_SET;
@@ -1583,6 +1595,15 @@ class ObjectInputFile final : public
io::RandomAccessFile {
// (for rational, see: https://github.com/apache/arrow/issues/34363)
static constexpr int64_t kPartUploadSize = 10 * 1024 * 1024;
+// Above this threshold, use a multi-part upload instead of a single request
upload. Only
+// relevant if early sanitization of writing to the bucket is disabled (see
+// `allow_delayed_open`).
+static constexpr int64_t kMultiPartUploadThresholdSize = kPartUploadSize - 1;
+
+static_assert(kMultiPartUploadThresholdSize < kPartUploadSize,
+ "Multi part upload threshold size must be stricly less than the
actual "
+ "multi part upload part size.");
+
// An OutputStream that writes to a S3 object
class ObjectOutputStream final : public io::OutputStream {
protected:
@@ -1598,7 +1619,8 @@ class ObjectOutputStream final : public io::OutputStream {
path_(path),
metadata_(metadata),
default_metadata_(options.default_metadata),
- background_writes_(options.background_writes) {}
+ background_writes_(options.background_writes),
+ allow_delayed_open_(options.allow_delayed_open) {}
~ObjectOutputStream() override {
// For compliance with the rest of the IO stack, Close rather than Abort,
@@ -1606,29 +1628,47 @@ class ObjectOutputStream final : public
io::OutputStream {
io::internal::CloseFromDestructor(this);
}
+ template <typename ObjectRequest>
+ Status SetMetadataInRequest(ObjectRequest* request) {
+ std::shared_ptr<const KeyValueMetadata> metadata;
+
+ if (metadata_ && metadata_->size() != 0) {
+ metadata = metadata_;
+ } else if (default_metadata_ && default_metadata_->size() != 0) {
+ metadata = default_metadata_;
+ }
+
+ bool is_content_type_set{false};
+ if (metadata) {
+ RETURN_NOT_OK(SetObjectMetadata(metadata, request));
+
+ is_content_type_set = metadata->Contains("Content-Type");
+ }
+
+ if (!is_content_type_set) {
+ // If we do not set anything then the SDK will default to application/xml
+ // which confuses some tools
(https://github.com/apache/arrow/issues/11934)
+ // So we instead default to application/octet-stream which is less
misleading
+ request->SetContentType("application/octet-stream");
+ }
+
+ return Status::OK();
+ }
+
std::shared_ptr<ObjectOutputStream> Self() {
return std::dynamic_pointer_cast<ObjectOutputStream>(shared_from_this());
}
- Status Init() {
+ Status CreateMultipartUpload() {
+ DCHECK(ShouldBeMultipartUpload());
+
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
// Initiate the multi-part upload
S3Model::CreateMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
- if (metadata_ && metadata_->size() != 0) {
- RETURN_NOT_OK(SetObjectMetadata(metadata_, &req));
- } else if (default_metadata_ && default_metadata_->size() != 0) {
- RETURN_NOT_OK(SetObjectMetadata(default_metadata_, &req));
- }
-
- // If we do not set anything then the SDK will default to application/xml
- // which confuses some tools (https://github.com/apache/arrow/issues/11934)
- // So we instead default to application/octet-stream which is less
misleading
- if (!req.ContentTypeHasBeenSet()) {
- req.SetContentType("application/octet-stream");
- }
+ RETURN_NOT_OK(SetMetadataInRequest(&req));
auto outcome = client_lock.Move()->CreateMultipartUpload(req);
if (!outcome.IsSuccess()) {
@@ -1637,7 +1677,19 @@ class ObjectOutputStream final : public io::OutputStream
{
path_.key, "' in bucket '", path_.bucket, "':
"),
"CreateMultipartUpload", outcome.GetError());
}
- upload_id_ = outcome.GetResult().GetUploadId();
+ multipart_upload_id_ = outcome.GetResult().GetUploadId();
+
+ return Status::OK();
+ }
+
+ Status Init() {
+ // If we are allowed to do delayed I/O, we can use a single request to
upload the
+ // data. If not, we use a multi-part upload and initiate it here to
+ // sanitize that writing to the bucket is possible.
+ if (!allow_delayed_open_) {
+ RETURN_NOT_OK(CreateMultipartUpload());
+ }
+
upload_state_ = std::make_shared<UploadState>();
closed_ = false;
return Status::OK();
@@ -1648,42 +1700,62 @@ class ObjectOutputStream final : public
io::OutputStream {
return Status::OK();
}
- ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
+ if (IsMultipartCreated()) {
+ ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
- S3Model::AbortMultipartUploadRequest req;
- req.SetBucket(ToAwsString(path_.bucket));
- req.SetKey(ToAwsString(path_.key));
- req.SetUploadId(upload_id_);
+ S3Model::AbortMultipartUploadRequest req;
+ req.SetBucket(ToAwsString(path_.bucket));
+ req.SetKey(ToAwsString(path_.key));
+ req.SetUploadId(multipart_upload_id_);
- auto outcome = client_lock.Move()->AbortMultipartUpload(req);
- if (!outcome.IsSuccess()) {
- return ErrorToStatus(
- std::forward_as_tuple("When aborting multiple part upload for key
'", path_.key,
- "' in bucket '", path_.bucket, "': "),
- "AbortMultipartUpload", outcome.GetError());
+ auto outcome = client_lock.Move()->AbortMultipartUpload(req);
+ if (!outcome.IsSuccess()) {
+ return ErrorToStatus(
+ std::forward_as_tuple("When aborting multiple part upload for key
'",
+ path_.key, "' in bucket '", path_.bucket,
"': "),
+ "AbortMultipartUpload", outcome.GetError());
+ }
}
+
current_part_.reset();
holder_ = nullptr;
closed_ = true;
+
return Status::OK();
}
// OutputStream interface
+ bool ShouldBeMultipartUpload() const {
+ return pos_ > kMultiPartUploadThresholdSize || !allow_delayed_open_;
+ }
+
+ bool IsMultipartCreated() const { return !multipart_upload_id_.empty(); }
+
Status EnsureReadyToFlushFromClose() {
- if (current_part_) {
- // Upload last part
- RETURN_NOT_OK(CommitCurrentPart());
- }
+ if (ShouldBeMultipartUpload()) {
+ if (current_part_) {
+ // Upload last part
+ RETURN_NOT_OK(CommitCurrentPart());
+ }
- // S3 mandates at least one part, upload an empty one if necessary
- if (part_number_ == 1) {
- RETURN_NOT_OK(UploadPart("", 0));
+ // S3 mandates at least one part, upload an empty one if necessary
+ if (part_number_ == 1) {
+ RETURN_NOT_OK(UploadPart("", 0));
+ }
+ } else {
+ RETURN_NOT_OK(UploadUsingSingleRequest());
}
return Status::OK();
}
+ Status CleanupAfterClose() {
+ holder_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
Status FinishPartUploadAfterFlush() {
ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
@@ -1697,7 +1769,7 @@ class ObjectOutputStream final : public io::OutputStream {
S3Model::CompleteMultipartUploadRequest req;
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
- req.SetUploadId(upload_id_);
+ req.SetUploadId(multipart_upload_id_);
req.SetMultipartUpload(std::move(completed_upload));
auto outcome =
@@ -1709,8 +1781,6 @@ class ObjectOutputStream final : public io::OutputStream {
"CompleteMultipartUpload", outcome.GetError());
}
- holder_ = nullptr;
- closed_ = true;
return Status::OK();
}
@@ -1720,7 +1790,12 @@ class ObjectOutputStream final : public io::OutputStream
{
RETURN_NOT_OK(EnsureReadyToFlushFromClose());
RETURN_NOT_OK(Flush());
- return FinishPartUploadAfterFlush();
+
+ if (IsMultipartCreated()) {
+ RETURN_NOT_OK(FinishPartUploadAfterFlush());
+ }
+
+ return CleanupAfterClose();
}
Future<> CloseAsync() override {
@@ -1729,8 +1804,12 @@ class ObjectOutputStream final : public io::OutputStream
{
RETURN_NOT_OK(EnsureReadyToFlushFromClose());
// Wait for in-progress uploads to finish (if async writes are enabled)
- return FlushAsync().Then(
- [self = Self()]() { return self->FinishPartUploadAfterFlush(); });
+ return FlushAsync().Then([self = Self()]() {
+ if (self->IsMultipartCreated()) {
+ RETURN_NOT_OK(self->FinishPartUploadAfterFlush());
+ }
+ return self->CleanupAfterClose();
+ });
}
bool closed() const override { return closed_; }
@@ -1776,7 +1855,8 @@ class ObjectOutputStream final : public io::OutputStream {
return Status::OK();
}
- // Upload current buffer
+ // Upload current buffer. We're only reaching this point if we have
accumulated
+ // enough data to upload.
RETURN_NOT_OK(CommitCurrentPart());
}
@@ -1810,40 +1890,73 @@ class ObjectOutputStream final : public
io::OutputStream {
}
// Wait for background writes to finish
std::unique_lock<std::mutex> lock(upload_state_->mutex);
- return upload_state_->pending_parts_completed;
+ return upload_state_->pending_uploads_completed;
}
// Upload-related helpers
Status CommitCurrentPart() {
+ if (!IsMultipartCreated()) {
+ RETURN_NOT_OK(CreateMultipartUpload());
+ }
+
ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish());
current_part_.reset();
current_part_size_ = 0;
return UploadPart(buf);
}
- Status UploadPart(std::shared_ptr<Buffer> buffer) {
- return UploadPart(buffer->data(), buffer->size(), buffer);
+ Status UploadUsingSingleRequest() {
+ std::shared_ptr<Buffer> buf;
+ if (current_part_ == nullptr) {
+ // In case the stream is closed directly after it has been opened
without writing
+ // anything, we'll have to create an empty buffer.
+ buf = std::make_shared<Buffer>("");
+ } else {
+ ARROW_ASSIGN_OR_RAISE(buf, current_part_->Finish());
+ }
+
+ current_part_.reset();
+ current_part_size_ = 0;
+ return UploadUsingSingleRequest(buf);
}
- Status UploadPart(const void* data, int64_t nbytes,
- std::shared_ptr<Buffer> owned_buffer = nullptr) {
- S3Model::UploadPartRequest req;
+ template <typename RequestType, typename OutcomeType>
+ using UploadResultCallbackFunction =
+ std::function<Status(const RequestType& request,
std::shared_ptr<UploadState>,
+ int32_t part_number, OutcomeType outcome)>;
+
+ static Result<Aws::S3::Model::PutObjectOutcome> TriggerUploadRequest(
+ const Aws::S3::Model::PutObjectRequest& request,
+ const std::shared_ptr<S3ClientHolder>& holder) {
+ ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock());
+ return client_lock.Move()->PutObject(request);
+ }
+
+ static Result<Aws::S3::Model::UploadPartOutcome> TriggerUploadRequest(
+ const Aws::S3::Model::UploadPartRequest& request,
+ const std::shared_ptr<S3ClientHolder>& holder) {
+ ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock());
+ return client_lock.Move()->UploadPart(request);
+ }
+
+ template <typename RequestType, typename OutcomeType>
+ Status Upload(
+ RequestType&& req,
+ UploadResultCallbackFunction<RequestType, OutcomeType>
sync_result_callback,
+ UploadResultCallbackFunction<RequestType, OutcomeType>
async_result_callback,
+ const void* data, int64_t nbytes, std::shared_ptr<Buffer> owned_buffer =
nullptr) {
req.SetBucket(ToAwsString(path_.bucket));
req.SetKey(ToAwsString(path_.key));
- req.SetUploadId(upload_id_);
- req.SetPartNumber(part_number_);
+ req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
req.SetContentLength(nbytes);
if (!background_writes_) {
req.SetBody(std::make_shared<StringViewStream>(data, nbytes));
- ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
- auto outcome = client_lock.Move()->UploadPart(req);
- if (!outcome.IsSuccess()) {
- return UploadPartError(req, outcome);
- } else {
- AddCompletedPart(upload_state_, part_number_, outcome.GetResult());
- }
+
+ ARROW_ASSIGN_OR_RAISE(auto outcome, TriggerUploadRequest(req, holder_));
+
+ RETURN_NOT_OK(sync_result_callback(req, upload_state_, part_number_,
outcome));
} else {
// If the data isn't owned, make an immutable copy for the lifetime of
the closure
if (owned_buffer == nullptr) {
@@ -1858,19 +1971,18 @@ class ObjectOutputStream final : public
io::OutputStream {
{
std::unique_lock<std::mutex> lock(upload_state_->mutex);
- if (upload_state_->parts_in_progress++ == 0) {
- upload_state_->pending_parts_completed = Future<>::Make();
+ if (upload_state_->uploads_in_progress++ == 0) {
+ upload_state_->pending_uploads_completed = Future<>::Make();
}
}
// The closure keeps the buffer and the upload state alive
auto deferred = [owned_buffer, holder = holder_, req = std::move(req),
- state = upload_state_,
+ state = upload_state_, async_result_callback,
part_number = part_number_]() mutable -> Status {
- ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock());
- auto outcome = client_lock.Move()->UploadPart(req);
- HandleUploadOutcome(state, part_number, req, outcome);
- return Status::OK();
+ ARROW_ASSIGN_OR_RAISE(auto outcome, TriggerUploadRequest(req, holder));
+
+ return async_result_callback(req, state, part_number, outcome);
};
RETURN_NOT_OK(SubmitIO(io_context_, std::move(deferred)));
}
@@ -1880,9 +1992,118 @@ class ObjectOutputStream final : public
io::OutputStream {
return Status::OK();
}
- static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
- int part_number, const
S3Model::UploadPartRequest& req,
- const Result<S3Model::UploadPartOutcome>&
result) {
+ static Status UploadUsingSingleRequestError(
+ const Aws::S3::Model::PutObjectRequest& request,
+ const Aws::S3::Model::PutObjectOutcome& outcome) {
+ return ErrorToStatus(
+ std::forward_as_tuple("When uploading object with key '",
request.GetKey(),
+ "' in bucket '", request.GetBucket(), "': "),
+ "PutObject", outcome.GetError());
+ }
+
+ Status UploadUsingSingleRequest(std::shared_ptr<Buffer> buffer) {
+ return UploadUsingSingleRequest(buffer->data(), buffer->size(), buffer);
+ }
+
+ Status UploadUsingSingleRequest(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer =
nullptr) {
+ auto sync_result_callback = [](const Aws::S3::Model::PutObjectRequest&
request,
+ std::shared_ptr<UploadState> state,
+ int32_t part_number,
+ Aws::S3::Model::PutObjectOutcome outcome) {
+ if (!outcome.IsSuccess()) {
+ return UploadUsingSingleRequestError(request, outcome);
+ }
+ return Status::OK();
+ };
+
+ auto async_result_callback = [](const Aws::S3::Model::PutObjectRequest&
request,
+ std::shared_ptr<UploadState> state,
+ int32_t part_number,
+ Aws::S3::Model::PutObjectOutcome outcome) {
+ HandleUploadUsingSingleRequestOutcome(state, request,
outcome.GetResult());
+ return Status::OK();
+ };
+
+ Aws::S3::Model::PutObjectRequest req{};
+ RETURN_NOT_OK(SetMetadataInRequest(&req));
+
+ return Upload<Aws::S3::Model::PutObjectRequest,
Aws::S3::Model::PutObjectOutcome>(
+ std::move(req), std::move(sync_result_callback),
std::move(async_result_callback),
+ data, nbytes, std::move(owned_buffer));
+ }
+
+ Status UploadPart(std::shared_ptr<Buffer> buffer) {
+ return UploadPart(buffer->data(), buffer->size(), buffer);
+ }
+
+ static Status UploadPartError(const Aws::S3::Model::UploadPartRequest&
request,
+ const Aws::S3::Model::UploadPartOutcome&
outcome) {
+ return ErrorToStatus(
+ std::forward_as_tuple("When uploading part for key '",
request.GetKey(),
+ "' in bucket '", request.GetBucket(), "': "),
+ "UploadPart", outcome.GetError());
+ }
+
+ Status UploadPart(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ if (!IsMultipartCreated()) {
+ RETURN_NOT_OK(CreateMultipartUpload());
+ }
+
+ Aws::S3::Model::UploadPartRequest req{};
+ req.SetPartNumber(part_number_);
+ req.SetUploadId(multipart_upload_id_);
+
+ auto sync_result_callback = [](const Aws::S3::Model::UploadPartRequest&
request,
+ std::shared_ptr<UploadState> state,
+ int32_t part_number,
+ Aws::S3::Model::UploadPartOutcome outcome) {
+ if (!outcome.IsSuccess()) {
+ return UploadPartError(request, outcome);
+ } else {
+ AddCompletedPart(state, part_number, outcome.GetResult());
+ }
+
+ return Status::OK();
+ };
+
+ auto async_result_callback = [](const Aws::S3::Model::UploadPartRequest&
request,
+ std::shared_ptr<UploadState> state,
+ int32_t part_number,
+ Aws::S3::Model::UploadPartOutcome outcome)
{
+ HandleUploadPartOutcome(state, part_number, request,
outcome.GetResult());
+ return Status::OK();
+ };
+
+ return Upload<Aws::S3::Model::UploadPartRequest,
Aws::S3::Model::UploadPartOutcome>(
+ std::move(req), std::move(sync_result_callback),
std::move(async_result_callback),
+ data, nbytes, std::move(owned_buffer));
+ }
+
+ static void HandleUploadUsingSingleRequestOutcome(
+ const std::shared_ptr<UploadState>& state, const
S3Model::PutObjectRequest& req,
+ const Result<S3Model::PutObjectOutcome>& result) {
+ std::unique_lock<std::mutex> lock(state->mutex);
+ if (!result.ok()) {
+ state->status &= result.status();
+ } else {
+ const auto& outcome = *result;
+ if (!outcome.IsSuccess()) {
+ state->status &= UploadUsingSingleRequestError(req, outcome);
+ }
+ }
+ // GH-41862: avoid potential deadlock if the Future's callback is called
+ // with the mutex taken.
+ auto fut = state->pending_uploads_completed;
+ lock.unlock();
+ fut.MarkFinished(state->status);
+ }
+
+ static void HandleUploadPartOutcome(const std::shared_ptr<UploadState>&
state,
+ int part_number,
+ const S3Model::UploadPartRequest& req,
+ const
Result<S3Model::UploadPartOutcome>& result) {
std::unique_lock<std::mutex> lock(state->mutex);
if (!result.ok()) {
state->status &= result.status();
@@ -1895,10 +2116,10 @@ class ObjectOutputStream final : public
io::OutputStream {
}
}
// Notify completion
- if (--state->parts_in_progress == 0) {
+ if (--state->uploads_in_progress == 0) {
// GH-41862: avoid potential deadlock if the Future's callback is called
// with the mutex taken.
- auto fut = state->pending_parts_completed;
+ auto fut = state->pending_uploads_completed;
lock.unlock();
// State could be mutated concurrently if another thread writes to the
// stream, but in this case the Flush() call is only advisory anyway.
@@ -1923,14 +2144,6 @@ class ObjectOutputStream final : public io::OutputStream
{
state->completed_parts[slot] = std::move(part);
}
- static Status UploadPartError(const S3Model::UploadPartRequest& req,
- const S3Model::UploadPartOutcome& outcome) {
- return ErrorToStatus(
- std::forward_as_tuple("When uploading part for key '", req.GetKey(),
- "' in bucket '", req.GetBucket(), "': "),
- "UploadPart", outcome.GetError());
- }
-
protected:
std::shared_ptr<S3ClientHolder> holder_;
const io::IOContext io_context_;
@@ -1938,8 +2151,9 @@ class ObjectOutputStream final : public io::OutputStream {
const std::shared_ptr<const KeyValueMetadata> metadata_;
const std::shared_ptr<const KeyValueMetadata> default_metadata_;
const bool background_writes_;
+ const bool allow_delayed_open_;
- Aws::String upload_id_;
+ Aws::String multipart_upload_id_;
bool closed_ = true;
int64_t pos_ = 0;
int32_t part_number_ = 1;
@@ -1950,10 +2164,11 @@ class ObjectOutputStream final : public
io::OutputStream {
// in the completion handler.
struct UploadState {
std::mutex mutex;
+ // Only populated for multi-part uploads.
Aws::Vector<S3Model::CompletedPart> completed_parts;
- int64_t parts_in_progress = 0;
+ int64_t uploads_in_progress = 0;
Status status;
- Future<> pending_parts_completed = Future<>::MakeFinished(Status::OK());
+ Future<> pending_uploads_completed = Future<>::MakeFinished(Status::OK());
};
std::shared_ptr<UploadState> upload_state_;
};
diff --git a/cpp/src/arrow/filesystem/s3fs.h b/cpp/src/arrow/filesystem/s3fs.h
index fbbe9d0b3f..85d5ff8fed 100644
--- a/cpp/src/arrow/filesystem/s3fs.h
+++ b/cpp/src/arrow/filesystem/s3fs.h
@@ -177,6 +177,16 @@ struct ARROW_EXPORT S3Options {
/// to be true to address these scenarios.
bool check_directory_existence_before_creation = false;
+ /// Whether to allow file-open methods to return before the actual open.
+ ///
+ /// Enabling this may reduce the latency of `OpenInputStream`,
`OpenOutputStream`,
+ /// and similar methods, by reducing the number of roundtrips necessary. It
may also
+ /// allow usage of more efficient S3 APIs for small files.
+ /// The downside is that failure conditions such as attempting to open a
file in a
+ /// non-existing bucket will only be reported when actual I/O is done (at
worse,
+ /// when attempting to close the file).
+ bool allow_delayed_open = false;
+
/// \brief Default metadata for OpenOutputStream.
///
/// This will be ignored if non-empty metadata is passed to OpenOutputStream.
diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc
b/cpp/src/arrow/filesystem/s3fs_test.cc
index 5a160a78ce..c33fa4f5aa 100644
--- a/cpp/src/arrow/filesystem/s3fs_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_test.cc
@@ -45,7 +45,9 @@
#include <aws/core/utils/logging/ConsoleLogSystem.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/CreateBucketRequest.h>
+#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/GetObjectRequest.h>
+#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/sts/STSClient.h>
@@ -450,25 +452,8 @@ class TestS3FS : public S3TestMixin {
req.SetBucket(ToAwsString("empty-bucket"));
ASSERT_OK(OutcomeToStatus("CreateBucket", client_->CreateBucket(req)));
}
- {
- Aws::S3::Model::PutObjectRequest req;
- req.SetBucket(ToAwsString("bucket"));
- req.SetKey(ToAwsString("emptydir/"));
- req.SetBody(std::make_shared<std::stringstream>(""));
- ASSERT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
- // NOTE: no need to create intermediate "directories" somedir/ and
- // somedir/subdir/
- req.SetKey(ToAwsString("somedir/subdir/subfile"));
- req.SetBody(std::make_shared<std::stringstream>("sub data"));
- ASSERT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
- req.SetKey(ToAwsString("somefile"));
- req.SetBody(std::make_shared<std::stringstream>("some data"));
- req.SetContentType("x-arrow/test");
- ASSERT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
- req.SetKey(ToAwsString("otherdir/1/2/3/otherfile"));
- req.SetBody(std::make_shared<std::stringstream>("other data"));
- ASSERT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
- }
+
+ ASSERT_OK(PopulateTestBucket());
}
void TearDown() override {
@@ -478,6 +463,72 @@ class TestS3FS : public S3TestMixin {
S3TestMixin::TearDown();
}
+ Status PopulateTestBucket() {
+ Aws::S3::Model::PutObjectRequest req;
+ req.SetBucket(ToAwsString("bucket"));
+ req.SetKey(ToAwsString("emptydir/"));
+ req.SetBody(std::make_shared<std::stringstream>(""));
+ RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
+ // NOTE: no need to create intermediate "directories" somedir/ and
+ // somedir/subdir/
+ req.SetKey(ToAwsString("somedir/subdir/subfile"));
+ req.SetBody(std::make_shared<std::stringstream>("sub data"));
+ RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
+ req.SetKey(ToAwsString("somefile"));
+ req.SetBody(std::make_shared<std::stringstream>("some data"));
+ req.SetContentType("x-arrow/test");
+ RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
+ req.SetKey(ToAwsString("otherdir/1/2/3/otherfile"));
+ req.SetBody(std::make_shared<std::stringstream>("other data"));
+ RETURN_NOT_OK(OutcomeToStatus("PutObject", client_->PutObject(req)));
+
+ return Status::OK();
+ }
+
+ Status RestoreTestBucket() {
+ // First empty the test bucket, and then re-upload initial test files.
+
+ Aws::S3::Model::Delete delete_object;
+ {
+ // Mostly taken from
+ //
https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/cpp/example_code/s3/list_objects.cpp
+ Aws::S3::Model::ListObjectsV2Request req;
+ req.SetBucket(Aws::String{"bucket"});
+
+ Aws::String continuation_token;
+ do {
+ if (!continuation_token.empty()) {
+ req.SetContinuationToken(continuation_token);
+ }
+
+ auto outcome = client_->ListObjectsV2(req);
+
+ if (!outcome.IsSuccess()) {
+ return OutcomeToStatus("ListObjectsV2", outcome);
+ } else {
+ Aws::Vector<Aws::S3::Model::Object> objects =
outcome.GetResult().GetContents();
+ for (const auto& object : objects) {
+ delete_object.AddObjects(
+ Aws::S3::Model::ObjectIdentifier().WithKey(object.GetKey()));
+ }
+
+ continuation_token = outcome.GetResult().GetNextContinuationToken();
+ }
+ } while (!continuation_token.empty());
+ }
+
+ {
+ Aws::S3::Model::DeleteObjectsRequest req;
+
+ req.SetDelete(std::move(delete_object));
+ req.SetBucket(Aws::String{"bucket"});
+
+ RETURN_NOT_OK(OutcomeToStatus("DeleteObjects",
client_->DeleteObjects(req)));
+ }
+
+ return PopulateTestBucket();
+ }
+
Result<std::shared_ptr<S3FileSystem>> MakeNewFileSystem(
io::IOContext io_context = io::default_io_context()) {
options_.ConfigureAccessKey(minio_->access_key(), minio_->secret_key());
@@ -518,11 +569,13 @@ class TestS3FS : public S3TestMixin {
AssertFileInfo(infos[11], "empty-bucket", FileType::Directory);
}
- void TestOpenOutputStream() {
+ void TestOpenOutputStream(bool allow_delayed_open) {
std::shared_ptr<io::OutputStream> stream;
- // Nonexistent
- ASSERT_RAISES(IOError,
fs_->OpenOutputStream("nonexistent-bucket/somefile"));
+ if (!allow_delayed_open) {
+ // Nonexistent
+ ASSERT_RAISES(IOError,
fs_->OpenOutputStream("nonexistent-bucket/somefile"));
+ }
// URI
ASSERT_RAISES(Invalid, fs_->OpenOutputStream("s3:bucket/newfile1"));
@@ -843,8 +896,8 @@ TEST_F(TestS3FS, GetFileInfoGenerator) {
TEST_F(TestS3FS, GetFileInfoGeneratorStress) {
// This test is slow because it needs to create a bunch of seed files.
However, it is
- // the only test that stresses listing and deleting when there are more than
1000 files
- // and paging is required.
+ // the only test that stresses listing and deleting when there are more than
1000
+ // files and paging is required.
constexpr int32_t kNumDirs = 4;
constexpr int32_t kNumFilesPerDir = 512;
FileInfoVector expected_infos;
@@ -1235,50 +1288,83 @@ TEST_F(TestS3FS, OpenInputFile) {
ASSERT_RAISES(IOError, file->Seek(10));
}
-TEST_F(TestS3FS, OpenOutputStreamBackgroundWrites) { TestOpenOutputStream(); }
+struct S3OptionsTestParameters {
+ bool background_writes{false};
+ bool allow_delayed_open{false};
-TEST_F(TestS3FS, OpenOutputStreamSyncWrites) {
- options_.background_writes = false;
- MakeFileSystem();
- TestOpenOutputStream();
-}
+ void ApplyToS3Options(S3Options* options) const {
+ options->background_writes = background_writes;
+ options->allow_delayed_open = allow_delayed_open;
+ }
-TEST_F(TestS3FS, OpenOutputStreamAbortBackgroundWrites) {
TestOpenOutputStreamAbort(); }
+ static std::vector<S3OptionsTestParameters> GetCartesianProduct() {
+ return {
+ S3OptionsTestParameters{true, false},
+ S3OptionsTestParameters{false, false},
+ S3OptionsTestParameters{true, true},
+ S3OptionsTestParameters{false, true},
+ };
+ }
-TEST_F(TestS3FS, OpenOutputStreamAbortSyncWrites) {
- options_.background_writes = false;
- MakeFileSystem();
- TestOpenOutputStreamAbort();
-}
+ std::string ToString() const {
+ return std::string("background_writes = ") + (background_writes ? "true" :
"false") +
+ ", allow_delayed_open = " + (allow_delayed_open ? "true" : "false");
+ }
+};
+
+TEST_F(TestS3FS, OpenOutputStream) {
+ for (const auto& combination :
S3OptionsTestParameters::GetCartesianProduct()) {
+ ARROW_SCOPED_TRACE(combination.ToString());
-TEST_F(TestS3FS, OpenOutputStreamDestructorBackgroundWrites) {
- TestOpenOutputStreamDestructor();
+ combination.ApplyToS3Options(&options_);
+ MakeFileSystem();
+ TestOpenOutputStream(combination.allow_delayed_open);
+ ASSERT_OK(RestoreTestBucket());
+ }
}
-TEST_F(TestS3FS, OpenOutputStreamDestructorSyncWrite) {
- options_.background_writes = false;
- MakeFileSystem();
- TestOpenOutputStreamDestructor();
+TEST_F(TestS3FS, OpenOutputStreamAbort) {
+ for (const auto& combination :
S3OptionsTestParameters::GetCartesianProduct()) {
+ ARROW_SCOPED_TRACE(combination.ToString());
+
+ combination.ApplyToS3Options(&options_);
+ MakeFileSystem();
+ TestOpenOutputStreamAbort();
+ ASSERT_OK(RestoreTestBucket());
+ }
}
-TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorBackgroundWrites) {
- TestOpenOutputStreamCloseAsyncDestructor();
+TEST_F(TestS3FS, OpenOutputStreamDestructor) {
+ for (const auto& combination :
S3OptionsTestParameters::GetCartesianProduct()) {
+ ARROW_SCOPED_TRACE(combination.ToString());
+
+ combination.ApplyToS3Options(&options_);
+ MakeFileSystem();
+ TestOpenOutputStreamDestructor();
+ ASSERT_OK(RestoreTestBucket());
+ }
}
-TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorSyncWrite) {
- options_.background_writes = false;
- MakeFileSystem();
- TestOpenOutputStreamCloseAsyncDestructor();
+TEST_F(TestS3FS, OpenOutputStreamAsync) {
+ for (const auto& combination :
S3OptionsTestParameters::GetCartesianProduct()) {
+ ARROW_SCOPED_TRACE(combination.ToString());
+
+ combination.ApplyToS3Options(&options_);
+ MakeFileSystem();
+ TestOpenOutputStreamCloseAsyncDestructor();
+ }
}
TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockBackgroundWrites) {
TestOpenOutputStreamCloseAsyncFutureDeadlock();
+ ASSERT_OK(RestoreTestBucket());
}
TEST_F(TestS3FS, OpenOutputStreamCloseAsyncFutureDeadlockSyncWrite) {
options_.background_writes = false;
MakeFileSystem();
TestOpenOutputStreamCloseAsyncFutureDeadlock();
+ ASSERT_OK(RestoreTestBucket());
}
TEST_F(TestS3FS, OpenOutputStreamMetadata) {
@@ -1396,8 +1482,8 @@ TEST_F(TestS3FS, CustomRetryStrategy) {
auto retry_strategy = std::make_shared<TestRetryStrategy>();
options_.retry_strategy = retry_strategy;
MakeFileSystem();
- // Attempt to open file that doesn't exist. Should hit
TestRetryStrategy::ShouldRetry()
- // 3 times before bubbling back up here.
+ // Attempt to open file that doesn't exist. Should hit
+ // TestRetryStrategy::ShouldRetry() 3 times before bubbling back up here.
ASSERT_RAISES(IOError, fs_->OpenInputStream("nonexistent-bucket/somefile"));
ASSERT_EQ(retry_strategy->GetErrorsEncountered().size(), 3);
for (const auto& error : retry_strategy->GetErrorsEncountered()) {