Tom-Newton commented on code in PR #38780:
URL: https://github.com/apache/arrow/pull/38780#discussion_r1398390141
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -461,6 +463,224 @@ class ObjectInputFile final : public io::RandomAccessFile
{
int64_t content_length_ = kNoSize;
std::shared_ptr<const KeyValueMetadata> metadata_;
};
+
+Status CreateEmptyBlockBlob(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ std::string s = "";
+ try {
+ block_blob_client->UploadFrom(
+ const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(s.data())),
s.size());
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "UploadFrom failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. There is no existing blob at
this "
+ "location or the existing blob must be replaced so
ObjectAppendStream must "
+ "create a new empty block blob.",
+ exception);
+ }
+ return Status::OK();
+}
+
+Result<Azure::Storage::Blobs::Models::GetBlockListResult> GetBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ try {
+ return block_blob_client->GetBlockList().Value;
+ } catch (Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "GetBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Cannot write to a file without
first "
+ "fetching the existing block list.",
+ exception);
+ }
+}
+
+Azure::Storage::Metadata ArrowMetadataToAzureMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& arrow_metadata) {
+ Azure::Storage::Metadata azure_metadata;
+ for (auto key_value : arrow_metadata->sorted_pairs()) {
+ azure_metadata[key_value.first] = key_value.second;
+ }
+ return azure_metadata;
+}
+
+Status CommitBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client,
+ const std::vector<std::string>& block_ids, const Azure::Storage::Metadata&
metadata) {
+ Azure::Storage::Blobs::CommitBlockListOptions options;
+ options.Metadata = metadata;
+ try {
+ // CommitBlockList puts all block_ids in the latest element. That means in
the case of
+ // overlapping block_ids the newly staged block ids will always replace the
+ // previously committed blocks.
+ //
https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
+ block_blob_client->CommitBlockList(block_ids, options);
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "CommitBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Committing is required to flush
an "
+ "output/append stream.",
+ exception);
+ }
+ return Status::OK();
+}
+
+class ObjectAppendStream final : public io::OutputStream {
+ public:
+ ObjectAppendStream(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient>
block_blob_client,
+ const io::IOContext& io_context, const AzureLocation& location,
+ const std::shared_ptr<const KeyValueMetadata>& metadata,
+ const AzureOptions& options, int64_t size = kNoSize)
+ : block_blob_client_(std::move(block_blob_client)),
+ io_context_(io_context),
+ location_(location),
+ content_length_(size) {
+ if (metadata && metadata->size() != 0) {
+ metadata_ = ArrowMetadataToAzureMetadata(metadata);
+ } else if (options.default_metadata && options.default_metadata->size() !=
0) {
+ metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata);
+ }
+ }
+
+ ~ObjectAppendStream() override {
+ // For compliance with the rest of the IO stack, Close rather than Abort,
+ // even though it may be more expensive.
+ io::internal::CloseFromDestructor(this);
+ }
+
+ Status Init() {
+ if (content_length_ != kNoSize) {
+ DCHECK_GE(content_length_, 0);
+ pos_ = content_length_;
+ } else {
+ try {
+ auto properties = block_blob_client_->GetProperties();
+ content_length_ = properties.Value.BlobSize;
+ pos_ = content_length_;
+ } catch (const Azure::Storage::StorageException& exception) {
+ if (exception.StatusCode ==
Azure::Core::Http::HttpStatusCode::NotFound) {
+ RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_));
+ } else {
+ return internal::ExceptionToStatus(
+ "GetProperties failed for '" + block_blob_client_->GetUrl() +
+ "' with an unexpected Azure error. Can not initialise an "
+ "ObjectAppendStream without knowing whether a file already
exists at "
+ "this path, and if it exists, its size.",
+ exception);
+ }
+ content_length_ = 0;
+ }
+ }
+ if (content_length_ > 0) {
+ ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
+ for (auto block : block_list.CommittedBlocks) {
+ block_ids_.push_back(block.Name);
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Abort() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ Status Close() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ RETURN_NOT_OK(Flush());
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ bool closed() const override { return closed_; }
+
+ Status CheckClosed(const char* action) const {
+ if (closed_) {
+ return Status::Invalid("Cannot ", action, " on closed stream.");
+ }
+ return Status::OK();
+ }
+
+ Result<int64_t> Tell() const override {
+ RETURN_NOT_OK(CheckClosed("tell"));
+ return pos_;
+ }
+
+ Status Write(const std::shared_ptr<Buffer>& buffer) override {
+ return DoAppend(buffer->data(), buffer->size(), buffer);
+ }
+
+ Status Write(const void* data, int64_t nbytes) override {
+ return DoAppend(data, nbytes);
+ }
+
+ Status DoAppend(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ RETURN_NOT_OK(CheckClosed("append"));
+ auto append_data = reinterpret_cast<uint8_t*>((void*)data);
+ auto block_content = Azure::Core::IO::MemoryBodyStream(
+ append_data, strlen(reinterpret_cast<char*>(append_data)));
+ if (block_content.Length() == 0) {
+ return Status::OK();
+ }
+
+ auto size = block_ids_.size();
+
+ // New block ids must always be distinct from the existing block ids.
Otherwise we
+ // will accidentally replace the content of existing blocks, causing
corruption.
+ // We will use monotonically increasing integers.
+ std::string new_block_id = std::to_string(size);
+
+ // Pad to 5 digits, because Azure allows a maximum of 50,000 blocks.
+ const size_t target_number_of_digits = 5;
+ int required_padding_digits =
+ target_number_of_digits - std::min(target_number_of_digits,
new_block_id.size());
+ new_block_id.insert(0, required_padding_digits, '0');
+ new_block_id += "-arrow"; // Add a suffix to reduce risk of block_id
collisions with
+ // blocks created by other applications.
+ new_block_id = Azure::Core::Convert::Base64Encode(
+ std::vector<uint8_t>(new_block_id.begin(), new_block_id.end()));
+
Review Comment:
If we want to be 100% confident of avoiding clashes then yes but personally
I think the current solution is a good compromise.
The risk should be zero when using `OpenOutputStream` because every block ID
will be created by this same scheme, using monotonically increasing integers.
The risk when using `OpenAppendStream` is that previously committed blocks used
unusual names that might conflict. For example if some other writer committed
block blob consisting of one block named `00002-arrow` then that would conflict
after this writer appends 2 additional blocks, and cause a corrupt blob. I
think this is extremely unlikely so personally I think this is a good option.
Additionally `OpenAppendStream` is not implemented at all for S3 and GCS so
presumably its not used much.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]