pitrou commented on code in PR #43096:
URL: https://github.com/apache/arrow/pull/43096#discussion_r1723565123


##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1066,20 +1123,110 @@ class ObjectAppendStream final : public 
io::OutputStream {
       // flush. This also avoids some unhandled errors when flushing in the 
destructor.
       return Status::OK();
     }
-    return CommitBlockList(block_blob_client_, block_ids_, 
commit_block_list_options_);
+
+    Future<> pending_blocks_completed;
+    {
+      std::unique_lock<std::mutex> lock(upload_state_->mutex);
+      pending_blocks_completed = upload_state_->pending_blocks_completed;
+    }
+
+    RETURN_NOT_OK(pending_blocks_completed.status());
+    std::unique_lock<std::mutex> lock(upload_state_->mutex);
+    return CommitBlockList(block_blob_client_, upload_state_->block_ids,
+                           commit_block_list_options_);
   }
 
- private:
-  Status DoAppend(const void* data, int64_t nbytes,
-                  std::shared_ptr<Buffer> owned_buffer = nullptr) {
-    RETURN_NOT_OK(CheckClosed("append"));
-    auto append_data = reinterpret_cast<const uint8_t*>(data);
-    Core::IO::MemoryBodyStream block_content(append_data, nbytes);
-    if (block_content.Length() == 0) {
+  Future<> FlushAsync() {
+    RETURN_NOT_OK(CheckClosed("flush async"));
+    if (!initialised_) {
+      // If the stream has not been successfully initialized then there is 
nothing to
+      // flush. This also avoids some unhandled errors when flushing in the 
destructor.
       return Status::OK();
     }
 
-    const auto n_block_ids = block_ids_.size();
+    Future<> pending_blocks_completed;
+    {
+      std::unique_lock<std::mutex> lock(upload_state_->mutex);
+      pending_blocks_completed = upload_state_->pending_blocks_completed;
+    }
+
+    return pending_blocks_completed.Then([self = Self()] {
+      std::unique_lock<std::mutex> lock(self->upload_state_->mutex);
+      return CommitBlockList(self->block_blob_client_, 
self->upload_state_->block_ids,
+                             self->commit_block_list_options_);
+    });
+  }
+
+ private:
+  Status AppendCurrentBlock() {
+    ARROW_ASSIGN_OR_RAISE(auto buf, current_block_->Finish());
+    current_block_.reset();
+    current_block_size_ = 0;
+    return AppendBlock(buf);
+  }
+
+  Status DoWrite(const void* data, int64_t nbytes,
+                 std::shared_ptr<Buffer> owned_buffer = nullptr) {
+    if (closed_) {
+      return Status::Invalid("Operation on closed stream");
+    }
+
+    const auto* data_ptr = reinterpret_cast<const int8_t*>(data);
+    auto advance_ptr = [this, &data_ptr, &nbytes](const int64_t offset) {
+      data_ptr += offset;
+      nbytes -= offset;
+      pos_ += offset;
+      content_length_ += offset;
+    };
+
+    // Handle case where we have some bytes buffered from prior calls.
+    if (current_block_size_ > 0) {
+      // Try to fill current buffer
+      const int64_t to_copy =
+          std::min(nbytes, kBlockUploadSizeBytes - current_block_size_);
+      RETURN_NOT_OK(current_block_->Write(data_ptr, to_copy));
+      current_block_size_ += to_copy;
+      advance_ptr(to_copy);
+
+      // If buffer isn't full, break
+      if (current_block_size_ < kBlockUploadSizeBytes) {
+        return Status::OK();
+      }
+
+      // Upload current buffer
+      RETURN_NOT_OK(AppendCurrentBlock());
+    }
+
+    // We can upload chunks without copying them into a buffer
+    while (nbytes >= kMaxBlockSizeBytes) {
+      RETURN_NOT_OK(AppendBlock(data_ptr, kMaxBlockSizeBytes));
+      advance_ptr(kMaxBlockSizeBytes);
+    }

Review Comment:
   Hmm, it seems this will not upload any chunk smaller than 
`kMaxBlockSizeBytes`? Perhaps instead something like:
   ```suggestion
       while (nbytes >= kBlockUploadSizeBytes) {
         const auto upload_size = std::min(nbytes, kMaxBlockSizeBytes);
         RETURN_NOT_OK(AppendBlock(data_ptr, upload_size));
         advance_ptr(upload_size);
       }
   ```



##########
cpp/src/arrow/filesystem/azurefs_test.cc:
##########
@@ -46,13 +46,15 @@
 #include <azure/storage/blobs.hpp>
 #include <azure/storage/common/storage_credential.hpp>
 #include <azure/storage/files/datalake.hpp>
+#include <vector>

Review Comment:
   Nit: can we keep stdlib includes together? i.e. move this above, just below 
`#include <string>`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to