Tom-Newton commented on code in PR #43096:
URL: https://github.com/apache/arrow/pull/43096#discussion_r1702676031
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1066,20 +1111,95 @@ class ObjectAppendStream final : public
io::OutputStream {
// flush. This also avoids some unhandled errors when flushing in the
destructor.
return Status::OK();
}
- return CommitBlockList(block_blob_client_, block_ids_,
commit_block_list_options_);
+
+ auto fut = FlushAsync();
+ RETURN_NOT_OK(fut.status());
+
+ std::unique_lock<std::mutex> lock(upload_state_->mutex);
+ return CommitBlockList(block_blob_client_, upload_state_->block_ids,
Review Comment:
I'm confused wow do we are committing the block list when flushing
asynchonously. `CommitBlockList` is critical for the blob to actually be
updated but it looks like its only used in the sync Flush.
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1066,20 +1111,95 @@ class ObjectAppendStream final : public
io::OutputStream {
// flush. This also avoids some unhandled errors when flushing in the
destructor.
return Status::OK();
}
- return CommitBlockList(block_blob_client_, block_ids_,
commit_block_list_options_);
+
+ auto fut = FlushAsync();
+ RETURN_NOT_OK(fut.status());
+
+ std::unique_lock<std::mutex> lock(upload_state_->mutex);
+ return CommitBlockList(block_blob_client_, upload_state_->block_ids,
+ commit_block_list_options_);
+ }
+
+ Future<> FlushAsync() {
+ // Wait for background writes to finish
+ std::unique_lock<std::mutex> lock(upload_state_->mutex);
+ return upload_state_->pending_blocks_completed;
}
private:
- Status DoAppend(const void* data, int64_t nbytes,
- std::shared_ptr<Buffer> owned_buffer = nullptr) {
- RETURN_NOT_OK(CheckClosed("append"));
- auto append_data = reinterpret_cast<const uint8_t*>(data);
- Core::IO::MemoryBodyStream block_content(append_data, nbytes);
- if (block_content.Length() == 0) {
- return Status::OK();
+ Status WriteBuffer() {
+ ARROW_ASSIGN_OR_RAISE(auto buf, current_block_->Finish());
+ current_block_.reset();
+ current_block_size_ = 0;
+ return AppendBlock(buf);
+ }
+
+ Status DoWrite(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer = nullptr) {
Review Comment:
Am I right in thinking this `owned_buffer` is not used?
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1066,20 +1098,88 @@ class ObjectAppendStream final : public
io::OutputStream {
// flush. This also avoids some unhandled errors when flushing in the
destructor.
return Status::OK();
}
- return CommitBlockList(block_blob_client_, block_ids_,
commit_block_list_options_);
+
+ auto fut = FlushAsync();
+ RETURN_NOT_OK(fut.status());
+
+ return CommitBlockList();
+ }
+
+ Future<> FlushAsync() {
+ RETURN_NOT_OK(CheckClosed("flush"));
Review Comment:
I think I agree with @kou. The S3 filesystem implements `Flush()` as
```
Status Flush() override {
auto fut = FlushAsync();
return fut.status();
}
```
I think it would be nice if we could do the same because it ensures that
`Flush` and `FlushAsync` have the same behaviour.
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1066,20 +1111,95 @@ class ObjectAppendStream final : public
io::OutputStream {
// flush. This also avoids some unhandled errors when flushing in the
destructor.
return Status::OK();
}
- return CommitBlockList(block_blob_client_, block_ids_,
commit_block_list_options_);
+
+ auto fut = FlushAsync();
+ RETURN_NOT_OK(fut.status());
+
+ std::unique_lock<std::mutex> lock(upload_state_->mutex);
+ return CommitBlockList(block_blob_client_, upload_state_->block_ids,
+ commit_block_list_options_);
+ }
+
+ Future<> FlushAsync() {
+ // Wait for background writes to finish
+ std::unique_lock<std::mutex> lock(upload_state_->mutex);
+ return upload_state_->pending_blocks_completed;
}
private:
- Status DoAppend(const void* data, int64_t nbytes,
- std::shared_ptr<Buffer> owned_buffer = nullptr) {
- RETURN_NOT_OK(CheckClosed("append"));
- auto append_data = reinterpret_cast<const uint8_t*>(data);
- Core::IO::MemoryBodyStream block_content(append_data, nbytes);
- if (block_content.Length() == 0) {
- return Status::OK();
+ Status WriteBuffer() {
+ ARROW_ASSIGN_OR_RAISE(auto buf, current_block_->Finish());
+ current_block_.reset();
+ current_block_size_ = 0;
+ return AppendBlock(buf);
+ }
+
+ Status DoWrite(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ if (closed_) {
+ return Status::Invalid("Operation on closed stream");
+ }
+
+ const auto* data_ptr = reinterpret_cast<const int8_t*>(data);
+ auto advance_ptr = [this, &data_ptr, &nbytes](const int64_t offset) {
+ data_ptr += offset;
+ nbytes -= offset;
+ pos_ += offset;
+ content_length_ += offset;
Review Comment:
Will `DoWrite` be used concurrently? If so do we need to worry about
synchronising changes to `pos_` and `content_length_`?
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -1031,12 +1057,31 @@ class ObjectAppendStream final : public
io::OutputStream {
if (closed_) {
return Status::OK();
}
+
+ if (current_block_) {
+ // Upload remaining buffer
+ RETURN_NOT_OK(WriteBuffer());
+ }
+
RETURN_NOT_OK(Flush());
block_blob_client_ = nullptr;
closed_ = true;
return Status::OK();
}
+ Future<> CloseAsync() override {
+ if (closed_) {
+ return Status::OK();
+ }
+
+ if (current_block_) {
+ // Upload remaining buffer
+ RETURN_NOT_OK(WriteBuffer());
+ }
+
+ return FlushAsync();
Review Comment:
I think we should set `block_blob_client_ = nullptr` and `closed_ = true`
similar to the sync close and the S3 filesystem
https://github.com/OliLay/arrow/blob/2ffbcb3e644290b3a66270392c50593d59d5bab8/cpp/src/arrow/filesystem/s3fs.cc#L1712-L1713
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]