This is an automated email from the ASF dual-hosted git repository.

bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e6a68322c GH-37670: [C++] IO FileInterface extend from 
enable_shared_from_this (#37713)
0e6a68322c is described below

commit 0e6a68322c525aea84d8b7b127a8716bf484b227
Author: mwish <[email protected]>
AuthorDate: Wed Sep 20 00:51:41 2023 +0800

    GH-37670: [C++] IO FileInterface extend from enable_shared_from_this 
(#37713)
    
    
    
    ### Rationale for this change
    
    S3 `FlushAsync` might has lifetime problem, this patch fixes that.
    
    ### What changes are included in this PR?
    
    1. Move `enable_shared_from_this` to `FileInterface`
    2. Update S3 `FlushAsync`
    3. Implement sync Flush to avoid call `share_from_this` in dtor.
    
    ### Are these changes tested?
    
    no
    
    ### Are there any user-facing changes?
    
    no
    
    * Closes: #37670
    
    Lead-authored-by: mwish <[email protected]>
    Co-authored-by: mwish <[email protected]>
    Co-authored-by: Benjamin Kietzman <[email protected]>
    Signed-off-by: Benjamin Kietzman <[email protected]>
---
 cpp/src/arrow/filesystem/s3fs.cc      | 85 ++++++++++++++++++++---------------
 cpp/src/arrow/filesystem/s3fs_test.cc | 25 +++++++++++
 cpp/src/arrow/io/interfaces.cc        |  5 ++-
 cpp/src/arrow/io/interfaces.h         |  6 +--
 4 files changed, 79 insertions(+), 42 deletions(-)

diff --git a/cpp/src/arrow/filesystem/s3fs.cc b/cpp/src/arrow/filesystem/s3fs.cc
index 29f8882225..08fbcde6fd 100644
--- a/cpp/src/arrow/filesystem/s3fs.cc
+++ b/cpp/src/arrow/filesystem/s3fs.cc
@@ -1454,14 +1454,7 @@ class ObjectOutputStream final : public io::OutputStream 
{
 
   // OutputStream interface
 
-  Status Close() override {
-    auto fut = CloseAsync();
-    return fut.status();
-  }
-
-  Future<> CloseAsync() override {
-    if (closed_) return Status::OK();
-
+  Status EnsureReadyToFlushFromClose() {
     if (current_part_) {
       // Upload last part
       RETURN_NOT_OK(CommitCurrentPart());
@@ -1472,36 +1465,56 @@ class ObjectOutputStream final : public 
io::OutputStream {
       RETURN_NOT_OK(UploadPart("", 0));
     }
 
-    // Wait for in-progress uploads to finish (if async writes are enabled)
-    return FlushAsync().Then([this]() {
-      ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
+    return Status::OK();
+  }
 
-      // At this point, all part uploads have finished successfully
-      DCHECK_GT(part_number_, 1);
-      DCHECK_EQ(upload_state_->completed_parts.size(),
-                static_cast<size_t>(part_number_ - 1));
-
-      S3Model::CompletedMultipartUpload completed_upload;
-      completed_upload.SetParts(upload_state_->completed_parts);
-      S3Model::CompleteMultipartUploadRequest req;
-      req.SetBucket(ToAwsString(path_.bucket));
-      req.SetKey(ToAwsString(path_.key));
-      req.SetUploadId(upload_id_);
-      req.SetMultipartUpload(std::move(completed_upload));
-
-      auto outcome =
-          
client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req));
-      if (!outcome.IsSuccess()) {
-        return ErrorToStatus(
-            std::forward_as_tuple("When completing multiple part upload for 
key '",
-                                  path_.key, "' in bucket '", path_.bucket, 
"': "),
-            "CompleteMultipartUpload", outcome.GetError());
-      }
+  Status FinishPartUploadAfterFlush() {
+    ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock());
 
-      holder_ = nullptr;
-      closed_ = true;
-      return Status::OK();
-    });
+    // At this point, all part uploads have finished successfully
+    DCHECK_GT(part_number_, 1);
+    DCHECK_EQ(upload_state_->completed_parts.size(),
+              static_cast<size_t>(part_number_ - 1));
+
+    S3Model::CompletedMultipartUpload completed_upload;
+    completed_upload.SetParts(upload_state_->completed_parts);
+    S3Model::CompleteMultipartUploadRequest req;
+    req.SetBucket(ToAwsString(path_.bucket));
+    req.SetKey(ToAwsString(path_.key));
+    req.SetUploadId(upload_id_);
+    req.SetMultipartUpload(std::move(completed_upload));
+
+    auto outcome =
+        
client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req));
+    if (!outcome.IsSuccess()) {
+      return ErrorToStatus(
+          std::forward_as_tuple("When completing multiple part upload for key 
'",
+                                path_.key, "' in bucket '", path_.bucket, "': 
"),
+          "CompleteMultipartUpload", outcome.GetError());
+    }
+
+    holder_ = nullptr;
+    closed_ = true;
+    return Status::OK();
+  }
+
+  Status Close() override {
+    if (closed_) return Status::OK();
+
+    RETURN_NOT_OK(EnsureReadyToFlushFromClose());
+
+    RETURN_NOT_OK(Flush());
+    return FinishPartUploadAfterFlush();
+  }
+
+  Future<> CloseAsync() override {
+    if (closed_) return Status::OK();
+
+    RETURN_NOT_OK(EnsureReadyToFlushFromClose());
+
+    auto self = 
std::dynamic_pointer_cast<ObjectOutputStream>(shared_from_this());
+    // Wait for in-progress uploads to finish (if async writes are enabled)
+    return FlushAsync().Then([self]() { return 
self->FinishPartUploadAfterFlush(); });
   }
 
   bool closed() const override { return closed_; }
diff --git a/cpp/src/arrow/filesystem/s3fs_test.cc 
b/cpp/src/arrow/filesystem/s3fs_test.cc
index e9f14fde72..f88ee7eef9 100644
--- a/cpp/src/arrow/filesystem/s3fs_test.cc
+++ b/cpp/src/arrow/filesystem/s3fs_test.cc
@@ -590,6 +590,21 @@ class TestS3FS : public S3TestMixin {
     AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
   }
 
+  void TestOpenOutputStreamCloseAsyncDestructor() {
+    std::shared_ptr<io::OutputStream> stream;
+    ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/somefile"));
+    ASSERT_OK(stream->Write("new data"));
+    // Destructor implicitly closes stream and completes the multipart upload.
+    // GH-37670: Testing it doesn't matter whether flush is triggered 
asynchronously
+    // after CloseAsync or synchronously after stream.reset() since we're just
+    // checking that `closeAsyncFut` keeps the stream alive until completion
+    // rather than segfaulting on a dangling stream
+    auto closeAsyncFut = stream->CloseAsync();
+    stream.reset();
+    ASSERT_OK(closeAsyncFut.MoveResult());
+    AssertObjectContents(client_.get(), "bucket", "somefile", "new data");
+  }
+
  protected:
   S3Options options_;
   std::shared_ptr<S3FileSystem> fs_;
@@ -1177,6 +1192,16 @@ TEST_F(TestS3FS, OpenOutputStreamDestructorSyncWrite) {
   TestOpenOutputStreamDestructor();
 }
 
+TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorBackgroundWrites) {
+  TestOpenOutputStreamCloseAsyncDestructor();
+}
+
+TEST_F(TestS3FS, OpenOutputStreamAsyncDestructorSyncWrite) {
+  options_.background_writes = false;
+  MakeFileSystem();
+  TestOpenOutputStreamCloseAsyncDestructor();
+}
+
 TEST_F(TestS3FS, OpenOutputStreamMetadata) {
   std::shared_ptr<io::OutputStream> stream;
 
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index e7819e139f..d3229fd067 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -123,7 +123,8 @@ Result<std::shared_ptr<const KeyValueMetadata>> 
InputStream::ReadMetadata() {
 // executor
 Future<std::shared_ptr<const KeyValueMetadata>> InputStream::ReadMetadataAsync(
     const IOContext& ctx) {
-  auto self = shared_from_this();
+  std::shared_ptr<InputStream> self =
+      std::dynamic_pointer_cast<InputStream>(shared_from_this());
   return DeferNotOk(internal::SubmitIO(ctx, [self] { return 
self->ReadMetadata(); }));
 }
 
@@ -165,7 +166,7 @@ Result<std::shared_ptr<Buffer>> 
RandomAccessFile::ReadAt(int64_t position,
 Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(const IOContext& 
ctx,
                                                             int64_t position,
                                                             int64_t nbytes) {
-  auto self = checked_pointer_cast<RandomAccessFile>(shared_from_this());
+  auto self = std::dynamic_pointer_cast<RandomAccessFile>(shared_from_this());
   return DeferNotOk(internal::SubmitIO(
       ctx, [self, position, nbytes] { return self->ReadAt(position, nbytes); 
}));
 }
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index dcbe4feb26..d2a11b7b6d 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -96,7 +96,7 @@ struct ARROW_EXPORT IOContext {
   StopToken stop_token_;
 };
 
-class ARROW_EXPORT FileInterface {
+class ARROW_EXPORT FileInterface : public 
std::enable_shared_from_this<FileInterface> {
  public:
   virtual ~FileInterface() = 0;
 
@@ -205,9 +205,7 @@ class ARROW_EXPORT OutputStream : virtual public 
FileInterface, public Writable
   OutputStream() = default;
 };
 
-class ARROW_EXPORT InputStream : virtual public FileInterface,
-                                 virtual public Readable,
-                                 public 
std::enable_shared_from_this<InputStream> {
+class ARROW_EXPORT InputStream : virtual public FileInterface, virtual public 
Readable {
  public:
   /// \brief Advance or skip stream indicated number of bytes
   /// \param[in] nbytes the number to move forward

Reply via email to