felipecrv commented on code in PR #43096:
URL: https://github.com/apache/arrow/pull/43096#discussion_r1684611261


##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -933,32 +934,38 @@ Result<Blobs::Models::GetBlockListResult> GetBlockList(
   }
 }
 
-Status CommitBlockList(std::shared_ptr<Storage::Blobs::BlockBlobClient> 
block_blob_client,

Review Comment:
   Can you move `CommitBlockList` back here and define `StageBlock` right after 
it? For two reasons: simplifying this diff and keeping related functions closer 
to each other.



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1066,20 +1098,91 @@ class ObjectAppendStream final : public 
io::OutputStream {
       // flush. This also avoids some unhandled errors when flushing in the 
destructor.
       return Status::OK();
     }
-    return CommitBlockList(block_blob_client_, block_ids_, 
commit_block_list_options_);
+
+    auto fut = FlushAsync();
+    RETURN_NOT_OK(fut.status());
+
+    return CommitBlockList();

Review Comment:
   I think it's better to keep the `CommitBlockList` version that takes 
parameters and have all callers acquire the lock before calling it.



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -933,32 +934,38 @@ Result<Blobs::Models::GetBlockListResult> GetBlockList(
   }
 }
 
-Status CommitBlockList(std::shared_ptr<Storage::Blobs::BlockBlobClient> 
block_blob_client,
-                       const std::vector<std::string>& block_ids,
-                       const Blobs::CommitBlockListOptions& options) {
+Status StageBlock(Blobs::BlockBlobClient* block_blob_client, const 
std::string& id,
+                  Core::IO::MemoryBodyStream& content) {
   try {
-    // CommitBlockList puts all block_ids in the latest element. That means in 
the case of
-    // overlapping block_ids the newly staged block ids will always replace the
-    // previously committed blocks.
-    // 
https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
-    block_blob_client->CommitBlockList(block_ids, options);
+    block_blob_client->StageBlock(id, content);
   } catch (const Storage::StorageException& exception) {
     return ExceptionToStatus(
-        exception, "CommitBlockList failed for '", block_blob_client->GetUrl(),
-        "'. Committing is required to flush an output/append stream.");
+        exception, "StageBlock failed for '", block_blob_client->GetUrl(),
+        "' new_block_id: '", id,
+        "'. Staging new blocks is fundamental to streaming writes to blob 
storage.");
   }
+
   return Status::OK();
 }
 
+/**
+ * Writes will be buffered up to this size (in bytes) before actually 
uploading them.
+ */
+static constexpr int64_t kBlockUploadSize = 10 * 1024 * 1024;

Review Comment:
   Rename to `kBlockUploadSize` so the unit is never confused.



##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -933,32 +934,38 @@ Result<Blobs::Models::GetBlockListResult> GetBlockList(
   }
 }
 
-Status CommitBlockList(std::shared_ptr<Storage::Blobs::BlockBlobClient> 
block_blob_client,
-                       const std::vector<std::string>& block_ids,
-                       const Blobs::CommitBlockListOptions& options) {
+Status StageBlock(Blobs::BlockBlobClient* block_blob_client, const 
std::string& id,
+                  Core::IO::MemoryBodyStream& content) {
   try {
-    // CommitBlockList puts all block_ids in the latest element. That means in 
the case of
-    // overlapping block_ids the newly staged block ids will always replace the
-    // previously committed blocks.
-    // 
https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
-    block_blob_client->CommitBlockList(block_ids, options);
+    block_blob_client->StageBlock(id, content);
   } catch (const Storage::StorageException& exception) {
     return ExceptionToStatus(
-        exception, "CommitBlockList failed for '", block_blob_client->GetUrl(),
-        "'. Committing is required to flush an output/append stream.");
+        exception, "StageBlock failed for '", block_blob_client->GetUrl(),
+        "' new_block_id: '", id,
+        "'. Staging new blocks is fundamental to streaming writes to blob 
storage.");
   }
+
   return Status::OK();
 }
 
+/**
+ * Writes will be buffered up to this size (in bytes) before actually 
uploading them.
+ */

Review Comment:
   Arrow C++ project uses `///` comments for docstrings.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to