This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 0e6a68322c GH-37670: [C++] IO FileInterface extend from
enable_shared_from_this (#37713)
0e6a68322c is described below
commit 0e6a68322c525aea84d8b7b127a8716bf484b227
Author: mwish <[email protected]>
AuthorDate: Wed Sep 20 00:51:41 2023 +0800
GH-37670: [C++] IO FileInterface extend from enable_shared_from_this
(#37713)
### Rationale for this change
S3 `FlushAsync` might has lifetime problem, this patch fixes that.
### What changes are included in this PR?
1. Move `enable_shared_from_this` to `FileInterface`
2. Update S3 `FlushAsync`
3. Implement sync Flush to avoid call `share_from_this` in dtor.
### Are these changes tested?
no
### Are there any user-facing changes?
no
* Closes: #37670
Lead-authored-by: mwish <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
---
cpp/src/arrow/filesystem/s3fs.cc | 85 ++++++++++++++++++++---------------
cpp/src/arrow/filesystem/s3fs_test.cc | 25 +++++++++++
cpp/src/arrow/io/interfaces.cc | 5 ++-
cpp/src/arrow/io/interfaces.h | 6 +--
4 files changed, 79 insertions(+), 42 deletions(-)
diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 29f8882225..08fbcde6fd 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -1454,14 +1454,7 @@ class ObjectOutputStream final : public io::OutputStream
{
// OutputStream interface
- Status Close() override {
- auto fut = CloseAsync();
- return fut.status();
- }
-
- Future<> CloseAsync() override {
- if (closed_) return Status::OK();
-
+ Status EnsureReadyToFlushFromClose() {
if (current_part_) {
// Upload last part
RETURN_NOT_OK(CommitCurrentPart());
@@ -1472,36 +1465,56 @@ class ObjectOutputStream final : public
io::OutputStream {
RETURN_NOT_OK(UploadPart("", 0));
}
- // Wait for in-progress uploads to finish (if async writes are enabled)
- return FlushAsync().Then([this]() {
- ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
+ return Status::OK();
+ }
- // At this point, all part uploads have finished successfully
- DCHECK_GT(part_number_, 1);
- DCHECK_EQ(upload_state_->completed_parts.size(),
- static_cast<size_t>(part_number_ - 1));
-
- S3Model::CompletedMultipartUpload completed_upload;
- completed_upload.SetParts(upload_state_->completed_parts);
- S3Model::CompleteMultipartUploadRequest req;
- req.SetBucket(ToAwsString(path_.bucket));
- req.SetKey(ToAwsString(path_.key));
- req.SetUploadId(upload_id_);
- req.SetMultipartUpload(std::move(completed_upload));
-
- auto outcome =
-
client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req));
- if (!outcome.IsSuccess()) {
- return ErrorToStatus(
- std::forward_as_tuple("When completing multiple part upload for
key '",
- path_.key, "' in bucket '", path_.bucket,
"': "),
- "CompleteMultipartUpload", outcome.GetError());
- }
+ Status FinishPartUploadAfterFlush() {
+ ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
- holder_ = nullptr;
- closed_ = true;
- return Status::OK();
- });
+ // At this point, all part uploads have finished successfully
+ DCHECK_GT(part_number_, 1);
+ DCHECK_EQ(upload_state_->completed_parts.size(),
+ static_cast<size_t>(part_number_ - 1));
+
+ S3Model::CompletedMultipartUpload completed_upload;
+ completed_upload.SetParts(upload_state_->completed_parts);
+ S3Model::CompleteMultipartUploadRequest req;
+ req.SetBucket(ToAwsString(path_.bucket));
+ req.SetKey(ToAwsString(path_.key));
+ req.SetUploadId(upload_id_);
+ req.SetMultipartUpload(std::move(completed_upload));
+
+ auto outcome =
+
client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req));
+ if (!outcome.IsSuccess()) {
+ return ErrorToStatus(
+ std::forward_as_tuple("When completing multiple part upload for key
'",
+ path_.key, "' in bucket '", path_.bucket, "':
"),
+ "CompleteMultipartUpload", outcome.GetError());
+ }
+
+ holder_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ Status Close() override {
+ if (closed_) return Status::OK();
+
+ RETURN_NOT_OK(EnsureReadyToFlushFromClose());
+
+ RETURN_NOT_OK(Flush());
+ return FinishPartUploadAfterFlush();
+ }
+
+ Future<> CloseAsync() override {
+ if (closed_) return Status::OK();
+
+ RETURN_NOT_OK(EnsureReadyToFlushFromClose());
+
+ auto self =
std::dynamic_pointer_cast<ObjectOutputStream>(shared_from_this());
+ // Wait for in-progress uploads to finish (if async writes are enabled)
+ return FlushAsync().Then([self]() { return
self->FinishPartUploadAfterFlush(); });
}
bool closed() const override { return closed_; }
diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc
b/cpp/src/arrow/filesystem/s3fs_test.cc
index e9f14fde72..f88ee7eef9 100644
--- a/cpp/src/arrow/filesystem/s3fs_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_test.cc
@@ -590,6 +590,21 @@ class TestS3FS : public S3TestMixin {
AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
}
+ void TestOpenOutputStreamCloseAsyncDestructor() {
+ std::shared_ptr<io::OutputStream> stream;
+ ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/somefile"));
+ ASSERT_OK(stream->Write("new data"));
+ // Destructor implicitly closes stream and completes the multipart upload.
+ // GH-37670: Testing it doesn't matter whether flush is triggered
asynchronously
+ // after CloseAsync or synchronously after stream.reset() since we're just
+ // checking that `closeAsyncFut` keeps the stream alive until completion
+ // rather than segfaulting on a dangling stream
+ auto closeAsyncFut = stream->CloseAsync();
+ stream.reset();
+ ASSERT_OK(closeAsyncFut.MoveResult());
+ AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
+ }
+
protected:
S3Options options_;
std::shared_ptr<S3FileSystem> fs_;
@@ -1177,6 +1192,16 @@ TEST_F(TestS3FS, OpenOutputStreamDestructorSyncWrite) {
TestOpenOutputStreamDestructor();
}
+TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorBackgroundWrites) {
+ TestOpenOutputStreamCloseAsyncDestructor();
+}
+
+TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorSyncWrite) {
+ options_.background_writes = false;
+ MakeFileSystem();
+ TestOpenOutputStreamCloseAsyncDestructor();
+}
+
TEST_F(TestS3FS, OpenOutputStreamMetadata) {
std::shared_ptr<io::OutputStream> stream;
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index e7819e139f..d3229fd067 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -123,7 +123,8 @@ Result<std::shared_ptr<const KeyValueMetadata>>
InputStream::ReadMetadata() {
// executor
Future<std::shared_ptr<const KeyValueMetadata>> InputStream::ReadMetadataAsync(
const IOContext& ctx) {
- auto self = shared_from_this();
+ std::shared_ptr<InputStream> self =
+ std::dynamic_pointer_cast<InputStream>(shared_from_this());
return DeferNotOk(internal::SubmitIO(ctx, [self] { return
self->ReadMetadata(); }));
}
@@ -165,7 +166,7 @@ Result<std::shared_ptr<Buffer>>
RandomAccessFile::ReadAt(int64_t position,
Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(const IOContext&
ctx,
int64_t position,
int64_t nbytes) {
- auto self = checked_pointer_cast<RandomAccessFile>(shared_from_this());
+ auto self = std::dynamic_pointer_cast<RandomAccessFile>(shared_from_this());
return DeferNotOk(internal::SubmitIO(
ctx, [self, position, nbytes] { return self->ReadAt(position, nbytes);
}));
}
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index dcbe4feb26..d2a11b7b6d 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -96,7 +96,7 @@ struct ARROW_EXPORT IOContext {
StopToken stop_token_;
};
-class ARROW_EXPORT FileInterface {
+class ARROW_EXPORT FileInterface : public
std::enable_shared_from_this<FileInterface> {
public:
virtual ~FileInterface() = 0;
@@ -205,9 +205,7 @@ class ARROW_EXPORT OutputStream : virtual public
FileInterface, public Writable
OutputStream() = default;
};
-class ARROW_EXPORT InputStream : virtual public FileInterface,
- virtual public Readable,
- public
std::enable_shared_from_this<InputStream> {
+class ARROW_EXPORT InputStream : virtual public FileInterface, virtual public
Readable {
public:
/// \brief Advance or skip stream indicated number of bytes
/// \param[in] nbytes the number to move forward