pitrou commented on code in PR #43096:
URL: https://github.com/apache/arrow/pull/43096#discussion_r1718504949


##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -1471,6 +1486,107 @@ class TestAzureFileSystem : public ::testing::Test {
     arrow::fs::AssertFileInfo(fs(), data.Path("dir/file0"), FileType::File);
   }
 
+  void AssertObjectContents(AzureFileSystem* fs, std::string_view path,
+                            std::string_view expected) {
+    ASSERT_OK_AND_ASSIGN(auto input, fs->OpenInputStream(std::string{path}));
+    std::string contents;
+    std::shared_ptr<Buffer> buffer;
+    do {
+      ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024));
+      contents.append(buffer->ToString());
+    } while (buffer->size() != 0);
+
+    EXPECT_EQ(expected, contents);
+  }
+
+  void TestOpenOutputStreamSmall() {
+    ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+
+    auto data = SetUpPreexistingData();
+    const auto path = data.ContainerPath("test-write-object");
+    ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {}));
+    const std::string_view expected(PreexistingData::kLoremIpsum);
+    ASSERT_OK(output->Write(expected));
+    ASSERT_OK(output->Close());
+
+    // Verify we can read the object back.
+    AssertObjectContents(fs.get(), path, expected);
+  }
+
+  void TestOpenOutputStreamLarge() {
+    ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+
+    auto data = SetUpPreexistingData();
+    const auto path = data.ContainerPath("test-write-object");
+    ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {}));
+
+    // Upload 5 MB, 4 MB und 2 MB and a very small write to test varying sizes
+    std::array<std::int64_t, 4> sizes{5 * 1024 * 1024, 4 * 1024 * 1024, 2 * 
1024 * 1024,
+                                      2000};
+    std::array<std::string, 4> buffers{

Review Comment:
   Ditto here and in other places.



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -627,6 +628,16 @@ class TestAzureOptions : public ::testing::Test {
     ASSERT_EQ(path, "container/dir/blob");
   }
 
+  void TestFromUriEnableBackgroundWrites() {
+    std::string path;
+    ASSERT_OK_AND_ASSIGN(auto options,
+                         AzureOptions::FromUri(
+                             
"abfs://account:[email protected]:10000/container/dir/blob?"
+                             "background_writes=true",
+                             &path));
+    ASSERT_EQ(options.background_writes, true);

Review Comment:
   Can you also test that it's false in the other URI tests above?



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -1471,6 +1486,107 @@ class TestAzureFileSystem : public ::testing::Test {
     arrow::fs::AssertFileInfo(fs(), data.Path("dir/file0"), FileType::File);
   }
 
+  void AssertObjectContents(AzureFileSystem* fs, std::string_view path,
+                            std::string_view expected) {
+    ASSERT_OK_AND_ASSIGN(auto input, fs->OpenInputStream(std::string{path}));
+    std::string contents;
+    std::shared_ptr<Buffer> buffer;
+    do {
+      ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024));
+      contents.append(buffer->ToString());
+    } while (buffer->size() != 0);
+
+    EXPECT_EQ(expected, contents);
+  }
+
+  void TestOpenOutputStreamSmall() {
+    ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+
+    auto data = SetUpPreexistingData();
+    const auto path = data.ContainerPath("test-write-object");
+    ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {}));
+    const std::string_view expected(PreexistingData::kLoremIpsum);
+    ASSERT_OK(output->Write(expected));
+    ASSERT_OK(output->Close());
+
+    // Verify we can read the object back.
+    AssertObjectContents(fs.get(), path, expected);
+  }
+
+  void TestOpenOutputStreamLarge() {
+    ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+
+    auto data = SetUpPreexistingData();
+    const auto path = data.ContainerPath("test-write-object");
+    ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {}));
+
+    // Upload 5 MB, 4 MB und 2 MB and a very small write to test varying sizes
+    std::array<std::int64_t, 4> sizes{5 * 1024 * 1024, 4 * 1024 * 1024, 2 * 
1024 * 1024,

Review Comment:
   Make this `std::vector<std::int64_t>` to avoid having to specify the size 
explicitly.



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1066,20 +1121,114 @@ class ObjectAppendStream final : public 
io::OutputStream {
       // flush. This also avoids some unhandled errors when flushing in the 
destructor.
       return Status::OK();
     }
-    return CommitBlockList(block_blob_client_, block_ids_, 
commit_block_list_options_);
+
+    Future<> pending_blocks_completed;
+    {
+      std::unique_lock<std::mutex> lock(upload_state_->mutex);
+      pending_blocks_completed = upload_state_->pending_blocks_completed;
+    }
+
+    RETURN_NOT_OK(pending_blocks_completed.status());

Review Comment:
   For the record, why is this outside of the `std::unique_lock` guard? Putting 
it inside would reduce code verbosity here.



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1066,20 +1121,114 @@ class ObjectAppendStream final : public 
io::OutputStream {
       // flush. This also avoids some unhandled errors when flushing in the 
destructor.
       return Status::OK();
     }
-    return CommitBlockList(block_blob_client_, block_ids_, 
commit_block_list_options_);
+
+    Future<> pending_blocks_completed;
+    {
+      std::unique_lock<std::mutex> lock(upload_state_->mutex);
+      pending_blocks_completed = upload_state_->pending_blocks_completed;
+    }
+
+    RETURN_NOT_OK(pending_blocks_completed.status());
+    std::unique_lock<std::mutex> lock(upload_state_->mutex);
+    return CommitBlockList(block_blob_client_, upload_state_->block_ids,
+                           commit_block_list_options_);
   }
 
- private:
-  Status DoAppend(const void* data, int64_t nbytes,
-                  std::shared_ptr<Buffer> owned_buffer = nullptr) {
-    RETURN_NOT_OK(CheckClosed("append"));
-    auto append_data = reinterpret_cast<const uint8_t*>(data);
-    Core::IO::MemoryBodyStream block_content(append_data, nbytes);
-    if (block_content.Length() == 0) {
+  Future<> FlushAsync() {
+    RETURN_NOT_OK(CheckClosed("flush async"));
+    if (!initialised_) {
+      // If the stream has not been successfully initialized then there is 
nothing to
+      // flush. This also avoids some unhandled errors when flushing in the 
destructor.
       return Status::OK();
     }
 
-    const auto n_block_ids = block_ids_.size();
+    Future<> pending_blocks_completed;
+    {
+      std::unique_lock<std::mutex> lock(upload_state_->mutex);
+      pending_blocks_completed = upload_state_->pending_blocks_completed;
+    }
+
+    return pending_blocks_completed.Then([self = Self()] {
+      std::unique_lock<std::mutex> lock(self->upload_state_->mutex);
+      return CommitBlockList(self->block_blob_client_, 
self->upload_state_->block_ids,
+                             self->commit_block_list_options_);
+    });
+  }
+
+ private:
+  Status WriteBuffer() {
+    ARROW_ASSIGN_OR_RAISE(auto buf, current_block_->Finish());
+    current_block_.reset();
+    current_block_size_ = 0;
+    return AppendBlock(buf);
+  }
+
+  Status DoWrite(const void* data, int64_t nbytes,
+                 std::shared_ptr<Buffer> owned_buffer = nullptr) {
+    if (closed_) {
+      return Status::Invalid("Operation on closed stream");
+    }
+
+    const auto* data_ptr = reinterpret_cast<const int8_t*>(data);
+    auto advance_ptr = [this, &data_ptr, &nbytes](const int64_t offset) {
+      data_ptr += offset;
+      nbytes -= offset;
+      pos_ += offset;
+      content_length_ += offset;
+    };
+
+    // Handle case where we have some bytes buffered from prior calls.
+    if (current_block_size_ > 0) {
+      // Try to fill current buffer
+      const int64_t to_copy =
+          std::min(nbytes, kBlockUploadSizeBytes - current_block_size_);
+      RETURN_NOT_OK(current_block_->Write(data_ptr, to_copy));
+      current_block_size_ += to_copy;
+      advance_ptr(to_copy);
+
+      // If buffer isn't full, break
+      if (current_block_size_ < kBlockUploadSizeBytes) {
+        return Status::OK();
+      }
+
+      // Upload current buffer
+      RETURN_NOT_OK(WriteBuffer());
+    }
+
+    // We can upload chunks without copying them into a buffer
+    while (nbytes >= kBlockUploadSizeBytes) {
+      RETURN_NOT_OK(AppendBlock(data_ptr, kBlockUploadSizeBytes));
+      advance_ptr(kBlockUploadSizeBytes);
+    }
+
+    // Buffer remaining bytes
+    if (nbytes > 0) {
+      current_block_size_ = nbytes;
+
+      if (current_block_ == nullptr) {
+        ARROW_ASSIGN_OR_RAISE(
+            current_block_,
+            io::BufferOutputStream::Create(kBlockUploadSizeBytes, 
io_context_.pool()));
+      } else {
+        // Re-use the allocation from before.
+        RETURN_NOT_OK(current_block_->Reset(kBlockUploadSizeBytes, 
io_context_.pool()));
+      }
+
+      RETURN_NOT_OK(current_block_->Write(data_ptr, current_block_size_));
+      pos_ += current_block_size_;
+      content_length_ += current_block_size_;
+    }
+
+    return Status::OK();
+  }
+
+  Status AppendBlock(std::shared_ptr<Buffer> buffer) {

Review Comment:
   Can you move this near the other `AppendBlock` overload?



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -1471,6 +1486,107 @@ class TestAzureFileSystem : public ::testing::Test {
     arrow::fs::AssertFileInfo(fs(), data.Path("dir/file0"), FileType::File);
   }
 
+  void AssertObjectContents(AzureFileSystem* fs, std::string_view path,
+                            std::string_view expected) {
+    ASSERT_OK_AND_ASSIGN(auto input, fs->OpenInputStream(std::string{path}));
+    std::string contents;
+    std::shared_ptr<Buffer> buffer;
+    do {
+      ASSERT_OK_AND_ASSIGN(buffer, input->Read(128 * 1024));
+      contents.append(buffer->ToString());
+    } while (buffer->size() != 0);
+
+    EXPECT_EQ(expected, contents);
+  }
+
+  void TestOpenOutputStreamSmall() {
+    ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+
+    auto data = SetUpPreexistingData();
+    const auto path = data.ContainerPath("test-write-object");
+    ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {}));
+    const std::string_view expected(PreexistingData::kLoremIpsum);
+    ASSERT_OK(output->Write(expected));
+    ASSERT_OK(output->Close());
+
+    // Verify we can read the object back.
+    AssertObjectContents(fs.get(), path, expected);
+  }
+
+  void TestOpenOutputStreamLarge() {
+    ASSERT_OK_AND_ASSIGN(auto fs, AzureFileSystem::Make(options_));
+
+    auto data = SetUpPreexistingData();
+    const auto path = data.ContainerPath("test-write-object");
+    ASSERT_OK_AND_ASSIGN(auto output, fs->OpenOutputStream(path, {}));
+
+    // Upload 5 MB, 4 MB und 2 MB and a very small write to test varying sizes
+    std::array<std::int64_t, 4> sizes{5 * 1024 * 1024, 4 * 1024 * 1024, 2 * 
1024 * 1024,
+                                      2000};
+    std::array<std::string, 4> buffers{

Review Comment:
   Arguably, this one could be constructed entirely programmatically (inside of 
hand-writing `sizes[0]` etc.) :-)



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1066,20 +1121,114 @@ class ObjectAppendStream final : public 
io::OutputStream {
       // flush. This also avoids some unhandled errors when flushing in the 
destructor.
       return Status::OK();
     }
-    return CommitBlockList(block_blob_client_, block_ids_, 
commit_block_list_options_);
+
+    Future<> pending_blocks_completed;
+    {
+      std::unique_lock<std::mutex> lock(upload_state_->mutex);
+      pending_blocks_completed = upload_state_->pending_blocks_completed;
+    }
+
+    RETURN_NOT_OK(pending_blocks_completed.status());
+    std::unique_lock<std::mutex> lock(upload_state_->mutex);
+    return CommitBlockList(block_blob_client_, upload_state_->block_ids,
+                           commit_block_list_options_);
   }
 
- private:
-  Status DoAppend(const void* data, int64_t nbytes,
-                  std::shared_ptr<Buffer> owned_buffer = nullptr) {
-    RETURN_NOT_OK(CheckClosed("append"));
-    auto append_data = reinterpret_cast<const uint8_t*>(data);
-    Core::IO::MemoryBodyStream block_content(append_data, nbytes);
-    if (block_content.Length() == 0) {
+  Future<> FlushAsync() {
+    RETURN_NOT_OK(CheckClosed("flush async"));
+    if (!initialised_) {
+      // If the stream has not been successfully initialized then there is 
nothing to
+      // flush. This also avoids some unhandled errors when flushing in the 
destructor.
       return Status::OK();
     }
 
-    const auto n_block_ids = block_ids_.size();
+    Future<> pending_blocks_completed;
+    {
+      std::unique_lock<std::mutex> lock(upload_state_->mutex);
+      pending_blocks_completed = upload_state_->pending_blocks_completed;
+    }
+
+    return pending_blocks_completed.Then([self = Self()] {
+      std::unique_lock<std::mutex> lock(self->upload_state_->mutex);
+      return CommitBlockList(self->block_blob_client_, 
self->upload_state_->block_ids,
+                             self->commit_block_list_options_);
+    });
+  }
+
+ private:
+  Status WriteBuffer() {

Review Comment:
   Call this `AppendCurrentBlock` or `WriteCurrentBlock` perhaps?



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1066,20 +1121,114 @@ class ObjectAppendStream final : public 
io::OutputStream {
       // flush. This also avoids some unhandled errors when flushing in the 
destructor.
       return Status::OK();
     }
-    return CommitBlockList(block_blob_client_, block_ids_, 
commit_block_list_options_);
+
+    Future<> pending_blocks_completed;
+    {
+      std::unique_lock<std::mutex> lock(upload_state_->mutex);
+      pending_blocks_completed = upload_state_->pending_blocks_completed;
+    }
+
+    RETURN_NOT_OK(pending_blocks_completed.status());
+    std::unique_lock<std::mutex> lock(upload_state_->mutex);
+    return CommitBlockList(block_blob_client_, upload_state_->block_ids,
+                           commit_block_list_options_);
   }
 
- private:
-  Status DoAppend(const void* data, int64_t nbytes,
-                  std::shared_ptr<Buffer> owned_buffer = nullptr) {
-    RETURN_NOT_OK(CheckClosed("append"));
-    auto append_data = reinterpret_cast<const uint8_t*>(data);
-    Core::IO::MemoryBodyStream block_content(append_data, nbytes);
-    if (block_content.Length() == 0) {
+  Future<> FlushAsync() {
+    RETURN_NOT_OK(CheckClosed("flush async"));
+    if (!initialised_) {
+      // If the stream has not been successfully initialized then there is 
nothing to
+      // flush. This also avoids some unhandled errors when flushing in the 
destructor.
       return Status::OK();
     }
 
-    const auto n_block_ids = block_ids_.size();
+    Future<> pending_blocks_completed;
+    {
+      std::unique_lock<std::mutex> lock(upload_state_->mutex);
+      pending_blocks_completed = upload_state_->pending_blocks_completed;
+    }
+
+    return pending_blocks_completed.Then([self = Self()] {
+      std::unique_lock<std::mutex> lock(self->upload_state_->mutex);
+      return CommitBlockList(self->block_blob_client_, 
self->upload_state_->block_ids,
+                             self->commit_block_list_options_);
+    });
+  }
+
+ private:
+  Status WriteBuffer() {
+    ARROW_ASSIGN_OR_RAISE(auto buf, current_block_->Finish());
+    current_block_.reset();
+    current_block_size_ = 0;
+    return AppendBlock(buf);
+  }
+
+  Status DoWrite(const void* data, int64_t nbytes,
+                 std::shared_ptr<Buffer> owned_buffer = nullptr) {
+    if (closed_) {
+      return Status::Invalid("Operation on closed stream");
+    }
+
+    const auto* data_ptr = reinterpret_cast<const int8_t*>(data);
+    auto advance_ptr = [this, &data_ptr, &nbytes](const int64_t offset) {
+      data_ptr += offset;
+      nbytes -= offset;
+      pos_ += offset;
+      content_length_ += offset;
+    };
+
+    // Handle case where we have some bytes buffered from prior calls.
+    if (current_block_size_ > 0) {
+      // Try to fill current buffer
+      const int64_t to_copy =
+          std::min(nbytes, kBlockUploadSizeBytes - current_block_size_);
+      RETURN_NOT_OK(current_block_->Write(data_ptr, to_copy));
+      current_block_size_ += to_copy;
+      advance_ptr(to_copy);
+
+      // If buffer isn't full, break
+      if (current_block_size_ < kBlockUploadSizeBytes) {
+        return Status::OK();
+      }
+
+      // Upload current buffer
+      RETURN_NOT_OK(WriteBuffer());
+    }
+
+    // We can upload chunks without copying them into a buffer
+    while (nbytes >= kBlockUploadSizeBytes) {
+      RETURN_NOT_OK(AppendBlock(data_ptr, kBlockUploadSizeBytes));

Review Comment:
   Is it actually better to append a block of `kBlockUploadSizeBytes` at a time 
here?



##########
cpp/src/arrow/filesystem/azurefs.h:
##########
@@ -112,6 +112,9 @@ struct ARROW_EXPORT AzureOptions {
   /// This will be ignored if non-empty metadata is passed to OpenOutputStream.
   std::shared_ptr<const KeyValueMetadata> default_metadata;
 
+  /// Whether OutputStream writes will be issued in the background, without 
blocking.
+  bool background_writes = false;

Review Comment:
   I would probably make this `true` by default as this is the common semantics 
accross filesystems (including local).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to