kou commented on code in PR #38780:
URL: https://github.com/apache/arrow/pull/38780#discussion_r1398332884
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -461,6 +463,224 @@ class ObjectInputFile final : public io::RandomAccessFile
{
int64_t content_length_ = kNoSize;
std::shared_ptr<const KeyValueMetadata> metadata_;
};
+
+Status CreateEmptyBlockBlob(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ std::string s = "";
+ try {
+ block_blob_client->UploadFrom(
+ const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(s.data())),
s.size());
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "UploadFrom failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. There is no existing blob at
this "
+ "location or the existing blob must be replaced so
ObjectAppendStream must "
+ "create a new empty block blob.",
+ exception);
+ }
+ return Status::OK();
+}
+
+Result<Azure::Storage::Blobs::Models::GetBlockListResult> GetBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ try {
+ return block_blob_client->GetBlockList().Value;
+ } catch (Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "GetBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Cannot write to a file without
first "
+ "fetching the existing block list.",
+ exception);
+ }
+}
+
+Azure::Storage::Metadata ArrowMetadataToAzureMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& arrow_metadata) {
+ Azure::Storage::Metadata azure_metadata;
+ for (auto key_value : arrow_metadata->sorted_pairs()) {
+ azure_metadata[key_value.first] = key_value.second;
+ }
+ return azure_metadata;
+}
+
+Status CommitBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client,
+ const std::vector<std::string>& block_ids, const Azure::Storage::Metadata&
metadata) {
+ Azure::Storage::Blobs::CommitBlockListOptions options;
+ options.Metadata = metadata;
+ try {
+ // CommitBlockList puts all block_ids in the latest element. That means in
the case of
+ // overlapping block_ids the newly staged block ids will always replace the
+ // previously committed blocks.
+ //
https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
+ block_blob_client->CommitBlockList(block_ids, options);
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "CommitBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Committing is required to flush
an "
+ "output/append stream.",
+ exception);
+ }
+ return Status::OK();
+}
+
+class ObjectAppendStream final : public io::OutputStream {
+ public:
+ ObjectAppendStream(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient>
block_blob_client,
+ const io::IOContext& io_context, const AzureLocation& location,
+ const std::shared_ptr<const KeyValueMetadata>& metadata,
+ const AzureOptions& options, int64_t size = kNoSize)
+ : block_blob_client_(std::move(block_blob_client)),
+ io_context_(io_context),
+ location_(location),
+ content_length_(size) {
+ if (metadata && metadata->size() != 0) {
+ metadata_ = ArrowMetadataToAzureMetadata(metadata);
+ } else if (options.default_metadata && options.default_metadata->size() !=
0) {
+ metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata);
+ }
+ }
+
+ ~ObjectAppendStream() override {
+ // For compliance with the rest of the IO stack, Close rather than Abort,
+ // even though it may be more expensive.
+ io::internal::CloseFromDestructor(this);
+ }
+
+ Status Init() {
+ if (content_length_ != kNoSize) {
+ DCHECK_GE(content_length_, 0);
+ pos_ = content_length_;
+ } else {
+ try {
+ auto properties = block_blob_client_->GetProperties();
+ content_length_ = properties.Value.BlobSize;
+ pos_ = content_length_;
+ } catch (const Azure::Storage::StorageException& exception) {
+ if (exception.StatusCode ==
Azure::Core::Http::HttpStatusCode::NotFound) {
+ RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_));
+ } else {
+ return internal::ExceptionToStatus(
+ "GetProperties failed for '" + block_blob_client_->GetUrl() +
+ "' with an unexpected Azure error. Can not initialise an "
+ "ObjectAppendStream without knowing whether a file already
exists at "
+ "this path, and if it exists, its size.",
+ exception);
+ }
+ content_length_ = 0;
+ }
+ }
+ if (content_length_ > 0) {
+ ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
+ for (auto block : block_list.CommittedBlocks) {
+ block_ids_.push_back(block.Name);
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Abort() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ Status Close() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ RETURN_NOT_OK(Flush());
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ bool closed() const override { return closed_; }
+
+ Status CheckClosed(const char* action) const {
+ if (closed_) {
+ return Status::Invalid("Cannot ", action, " on closed stream.");
+ }
+ return Status::OK();
+ }
+
+ Result<int64_t> Tell() const override {
+ RETURN_NOT_OK(CheckClosed("tell"));
+ return pos_;
+ }
+
+ Status Write(const std::shared_ptr<Buffer>& buffer) override {
+ return DoAppend(buffer->data(), buffer->size(), buffer);
+ }
+
+ Status Write(const void* data, int64_t nbytes) override {
+ return DoAppend(data, nbytes);
+ }
+
+ Status DoAppend(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ RETURN_NOT_OK(CheckClosed("append"));
+ auto append_data = reinterpret_cast<uint8_t*>((void*)data);
+ auto block_content = Azure::Core::IO::MemoryBodyStream(
+ append_data, strlen(reinterpret_cast<char*>(append_data)));
+ if (block_content.Length() == 0) {
+ return Status::OK();
+ }
+
+ auto size = block_ids_.size();
+
+ // New block ids must always be distinct from the existing block ids.
Otherwise we
+ // will accidentally replace the content of existing blocks, causing
corruption.
+ // We will use monotonically increasing integers.
+ std::string new_block_id = std::to_string(size);
+
+ // Pad to 5 digits, because Azure allows a maximum of 50,000 blocks.
+ const size_t target_number_of_digits = 5;
+ int required_padding_digits =
+ target_number_of_digits - std::min(target_number_of_digits,
new_block_id.size());
+ new_block_id.insert(0, required_padding_digits, '0');
+ new_block_id += "-arrow"; // Add a suffix to reduce risk of block_id
collisions with
+ // blocks created by other applications.
+ new_block_id = Azure::Core::Convert::Base64Encode(
+ std::vector<uint8_t>(new_block_id.begin(), new_block_id.end()));
+
+ try {
+ block_blob_client_->StageBlock(new_block_id, block_content);
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "StageBlock failed for '" + block_blob_client_->GetUrl() + "'
new_block_id: '" +
+ new_block_id +
+ "' with an unexpected Azure error. Staging new blocks is
fundamental to "
+ "streaming writes to blob storage.",
+ exception);
+ }
+ block_ids_.push_back(new_block_id);
+ pos_ += nbytes;
+ content_length_ += nbytes;
+ return Status::OK();
+ }
+
+ Status Flush() override {
+ RETURN_NOT_OK(CheckClosed("flush"));
+ return CommitBlockList(block_blob_client_, block_ids_, metadata_);
+ }
+
+ protected:
Review Comment:
Can we use `private` here?
```suggestion
private:
```
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -461,6 +463,224 @@ class ObjectInputFile final : public io::RandomAccessFile
{
int64_t content_length_ = kNoSize;
std::shared_ptr<const KeyValueMetadata> metadata_;
};
+
+Status CreateEmptyBlockBlob(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ std::string s = "";
+ try {
+ block_blob_client->UploadFrom(
+ const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(s.data())),
s.size());
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "UploadFrom failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. There is no existing blob at
this "
+ "location or the existing blob must be replaced so
ObjectAppendStream must "
+ "create a new empty block blob.",
+ exception);
+ }
+ return Status::OK();
+}
+
+Result<Azure::Storage::Blobs::Models::GetBlockListResult> GetBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ try {
+ return block_blob_client->GetBlockList().Value;
+ } catch (Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "GetBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Cannot write to a file without
first "
+ "fetching the existing block list.",
+ exception);
+ }
+}
+
+Azure::Storage::Metadata ArrowMetadataToAzureMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& arrow_metadata) {
+ Azure::Storage::Metadata azure_metadata;
+ for (auto key_value : arrow_metadata->sorted_pairs()) {
+ azure_metadata[key_value.first] = key_value.second;
+ }
+ return azure_metadata;
+}
+
+Status CommitBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client,
+ const std::vector<std::string>& block_ids, const Azure::Storage::Metadata&
metadata) {
+ Azure::Storage::Blobs::CommitBlockListOptions options;
+ options.Metadata = metadata;
+ try {
+ // CommitBlockList puts all block_ids in the latest element. That means in
the case of
+ // overlapping block_ids the newly staged block ids will always replace the
+ // previously committed blocks.
+ //
https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
+ block_blob_client->CommitBlockList(block_ids, options);
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "CommitBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Committing is required to flush
an "
+ "output/append stream.",
+ exception);
+ }
+ return Status::OK();
+}
+
+class ObjectAppendStream final : public io::OutputStream {
+ public:
+ ObjectAppendStream(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient>
block_blob_client,
+ const io::IOContext& io_context, const AzureLocation& location,
+ const std::shared_ptr<const KeyValueMetadata>& metadata,
+ const AzureOptions& options, int64_t size = kNoSize)
+ : block_blob_client_(std::move(block_blob_client)),
+ io_context_(io_context),
+ location_(location),
+ content_length_(size) {
+ if (metadata && metadata->size() != 0) {
+ metadata_ = ArrowMetadataToAzureMetadata(metadata);
+ } else if (options.default_metadata && options.default_metadata->size() !=
0) {
+ metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata);
+ }
+ }
+
+ ~ObjectAppendStream() override {
+ // For compliance with the rest of the IO stack, Close rather than Abort,
+ // even though it may be more expensive.
+ io::internal::CloseFromDestructor(this);
+ }
+
+ Status Init() {
+ if (content_length_ != kNoSize) {
+ DCHECK_GE(content_length_, 0);
+ pos_ = content_length_;
+ } else {
+ try {
+ auto properties = block_blob_client_->GetProperties();
+ content_length_ = properties.Value.BlobSize;
+ pos_ = content_length_;
+ } catch (const Azure::Storage::StorageException& exception) {
+ if (exception.StatusCode ==
Azure::Core::Http::HttpStatusCode::NotFound) {
+ RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_));
+ } else {
+ return internal::ExceptionToStatus(
+ "GetProperties failed for '" + block_blob_client_->GetUrl() +
+ "' with an unexpected Azure error. Can not initialise an "
+ "ObjectAppendStream without knowing whether a file already
exists at "
+ "this path, and if it exists, its size.",
+ exception);
+ }
+ content_length_ = 0;
+ }
+ }
+ if (content_length_ > 0) {
+ ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
+ for (auto block : block_list.CommittedBlocks) {
+ block_ids_.push_back(block.Name);
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Abort() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ Status Close() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ RETURN_NOT_OK(Flush());
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ bool closed() const override { return closed_; }
+
+ Status CheckClosed(const char* action) const {
+ if (closed_) {
+ return Status::Invalid("Cannot ", action, " on closed stream.");
+ }
+ return Status::OK();
+ }
+
+ Result<int64_t> Tell() const override {
+ RETURN_NOT_OK(CheckClosed("tell"));
+ return pos_;
+ }
+
+ Status Write(const std::shared_ptr<Buffer>& buffer) override {
+ return DoAppend(buffer->data(), buffer->size(), buffer);
+ }
+
+ Status Write(const void* data, int64_t nbytes) override {
+ return DoAppend(data, nbytes);
+ }
+
+ Status DoAppend(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ RETURN_NOT_OK(CheckClosed("append"));
+ auto append_data = reinterpret_cast<uint8_t*>((void*)data);
+ auto block_content = Azure::Core::IO::MemoryBodyStream(
+ append_data, strlen(reinterpret_cast<char*>(append_data)));
Review Comment:
Can we use `nbytes` here?
```suggestion
append_data, nbytes);
```
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -461,6 +463,224 @@ class ObjectInputFile final : public io::RandomAccessFile
{
int64_t content_length_ = kNoSize;
std::shared_ptr<const KeyValueMetadata> metadata_;
};
+
+Status CreateEmptyBlockBlob(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ std::string s = "";
+ try {
+ block_blob_client->UploadFrom(
+ const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(s.data())),
s.size());
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "UploadFrom failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. There is no existing blob at
this "
+ "location or the existing blob must be replaced so
ObjectAppendStream must "
+ "create a new empty block blob.",
+ exception);
+ }
+ return Status::OK();
+}
+
+Result<Azure::Storage::Blobs::Models::GetBlockListResult> GetBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ try {
+ return block_blob_client->GetBlockList().Value;
+ } catch (Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "GetBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Cannot write to a file without
first "
+ "fetching the existing block list.",
+ exception);
+ }
+}
+
+Azure::Storage::Metadata ArrowMetadataToAzureMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& arrow_metadata) {
+ Azure::Storage::Metadata azure_metadata;
+ for (auto key_value : arrow_metadata->sorted_pairs()) {
+ azure_metadata[key_value.first] = key_value.second;
+ }
+ return azure_metadata;
+}
+
+Status CommitBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client,
+ const std::vector<std::string>& block_ids, const Azure::Storage::Metadata&
metadata) {
+ Azure::Storage::Blobs::CommitBlockListOptions options;
+ options.Metadata = metadata;
+ try {
+ // CommitBlockList puts all block_ids in the latest element. That means in
the case of
+ // overlapping block_ids the newly staged block ids will always replace the
+ // previously committed blocks.
+ //
https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
+ block_blob_client->CommitBlockList(block_ids, options);
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "CommitBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Committing is required to flush
an "
+ "output/append stream.",
+ exception);
+ }
+ return Status::OK();
+}
+
+class ObjectAppendStream final : public io::OutputStream {
+ public:
+ ObjectAppendStream(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient>
block_blob_client,
+ const io::IOContext& io_context, const AzureLocation& location,
+ const std::shared_ptr<const KeyValueMetadata>& metadata,
+ const AzureOptions& options, int64_t size = kNoSize)
+ : block_blob_client_(std::move(block_blob_client)),
+ io_context_(io_context),
+ location_(location),
+ content_length_(size) {
+ if (metadata && metadata->size() != 0) {
+ metadata_ = ArrowMetadataToAzureMetadata(metadata);
+ } else if (options.default_metadata && options.default_metadata->size() !=
0) {
+ metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata);
+ }
Review Comment:
Do these metadata replace the existing metadata? Should we merge with the
existing metadata?
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -722,6 +942,30 @@ class AzureFileSystem::Impl {
return Status::OK();
}
+
+ Result<std::shared_ptr<ObjectAppendStream>> OpenAppendStream(
+ const AzureLocation& location,
+ const std::shared_ptr<const KeyValueMetadata>& metadata, const bool
truncate,
+ AzureFileSystem* fs) {
+ RETURN_NOT_OK(ValidateFileLocation(location));
+ ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(location.path));
+
+ auto block_blob_client =
std::make_shared<Azure::Storage::Blobs::BlockBlobClient>(
+ blob_service_client_->GetBlobContainerClient(location.container)
+ .GetBlockBlobClient(location.path));
+
+ std::shared_ptr<ObjectAppendStream> ptr;
Review Comment:
Can we use more meaningful name than `ptr` such as `stream`?
```suggestion
std::shared_ptr<ObjectAppendStream> stream;
```
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -461,6 +463,224 @@ class ObjectInputFile final : public io::RandomAccessFile
{
int64_t content_length_ = kNoSize;
std::shared_ptr<const KeyValueMetadata> metadata_;
};
+
+Status CreateEmptyBlockBlob(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ std::string s = "";
+ try {
+ block_blob_client->UploadFrom(
+ const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(s.data())),
s.size());
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "UploadFrom failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. There is no existing blob at
this "
+ "location or the existing blob must be replaced so
ObjectAppendStream must "
+ "create a new empty block blob.",
+ exception);
+ }
+ return Status::OK();
+}
+
+Result<Azure::Storage::Blobs::Models::GetBlockListResult> GetBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ try {
+ return block_blob_client->GetBlockList().Value;
+ } catch (Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "GetBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Cannot write to a file without
first "
+ "fetching the existing block list.",
+ exception);
+ }
+}
+
+Azure::Storage::Metadata ArrowMetadataToAzureMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& arrow_metadata) {
+ Azure::Storage::Metadata azure_metadata;
+ for (auto key_value : arrow_metadata->sorted_pairs()) {
+ azure_metadata[key_value.first] = key_value.second;
+ }
+ return azure_metadata;
+}
+
+Status CommitBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client,
+ const std::vector<std::string>& block_ids, const Azure::Storage::Metadata&
metadata) {
+ Azure::Storage::Blobs::CommitBlockListOptions options;
+ options.Metadata = metadata;
+ try {
+ // CommitBlockList puts all block_ids in the latest element. That means in
the case of
+ // overlapping block_ids the newly staged block ids will always replace the
+ // previously committed blocks.
+ //
https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
+ block_blob_client->CommitBlockList(block_ids, options);
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "CommitBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Committing is required to flush
an "
+ "output/append stream.",
+ exception);
+ }
+ return Status::OK();
+}
+
+class ObjectAppendStream final : public io::OutputStream {
+ public:
+ ObjectAppendStream(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient>
block_blob_client,
+ const io::IOContext& io_context, const AzureLocation& location,
+ const std::shared_ptr<const KeyValueMetadata>& metadata,
+ const AzureOptions& options, int64_t size = kNoSize)
+ : block_blob_client_(std::move(block_blob_client)),
+ io_context_(io_context),
+ location_(location),
+ content_length_(size) {
+ if (metadata && metadata->size() != 0) {
+ metadata_ = ArrowMetadataToAzureMetadata(metadata);
+ } else if (options.default_metadata && options.default_metadata->size() !=
0) {
+ metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata);
+ }
+ }
+
+ ~ObjectAppendStream() override {
+ // For compliance with the rest of the IO stack, Close rather than Abort,
+ // even though it may be more expensive.
+ io::internal::CloseFromDestructor(this);
+ }
+
+ Status Init() {
+ if (content_length_ != kNoSize) {
+ DCHECK_GE(content_length_, 0);
+ pos_ = content_length_;
+ } else {
+ try {
+ auto properties = block_blob_client_->GetProperties();
+ content_length_ = properties.Value.BlobSize;
+ pos_ = content_length_;
+ } catch (const Azure::Storage::StorageException& exception) {
+ if (exception.StatusCode ==
Azure::Core::Http::HttpStatusCode::NotFound) {
+ RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_));
+ } else {
+ return internal::ExceptionToStatus(
+ "GetProperties failed for '" + block_blob_client_->GetUrl() +
+ "' with an unexpected Azure error. Can not initialise an "
+ "ObjectAppendStream without knowing whether a file already
exists at "
+ "this path, and if it exists, its size.",
+ exception);
+ }
+ content_length_ = 0;
+ }
+ }
+ if (content_length_ > 0) {
+ ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
+ for (auto block : block_list.CommittedBlocks) {
+ block_ids_.push_back(block.Name);
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Abort() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ Status Close() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ RETURN_NOT_OK(Flush());
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ bool closed() const override { return closed_; }
+
+ Status CheckClosed(const char* action) const {
+ if (closed_) {
+ return Status::Invalid("Cannot ", action, " on closed stream.");
+ }
+ return Status::OK();
+ }
+
+ Result<int64_t> Tell() const override {
+ RETURN_NOT_OK(CheckClosed("tell"));
+ return pos_;
+ }
+
+ Status Write(const std::shared_ptr<Buffer>& buffer) override {
+ return DoAppend(buffer->data(), buffer->size(), buffer);
+ }
+
+ Status Write(const void* data, int64_t nbytes) override {
+ return DoAppend(data, nbytes);
+ }
+
+ Status DoAppend(const void* data, int64_t nbytes,
Review Comment:
Should we make this `private`?
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -461,6 +463,224 @@ class ObjectInputFile final : public io::RandomAccessFile
{
int64_t content_length_ = kNoSize;
std::shared_ptr<const KeyValueMetadata> metadata_;
};
+
+Status CreateEmptyBlockBlob(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ std::string s = "";
+ try {
+ block_blob_client->UploadFrom(
+ const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(s.data())),
s.size());
+ } catch (const Azure::Storage::StorageException& exception) {
Review Comment:
Can we use `nullptr` here?
```suggestion
try {
block_blob_client->UploadFrom(nullptr, 0);
} catch (const Azure::Storage::StorageException& exception) {
```
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -461,6 +463,224 @@ class ObjectInputFile final : public io::RandomAccessFile
{
int64_t content_length_ = kNoSize;
std::shared_ptr<const KeyValueMetadata> metadata_;
};
+
+Status CreateEmptyBlockBlob(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ std::string s = "";
+ try {
+ block_blob_client->UploadFrom(
+ const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(s.data())),
s.size());
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "UploadFrom failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. There is no existing blob at
this "
+ "location or the existing blob must be replaced so
ObjectAppendStream must "
+ "create a new empty block blob.",
+ exception);
+ }
+ return Status::OK();
+}
+
+Result<Azure::Storage::Blobs::Models::GetBlockListResult> GetBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ try {
+ return block_blob_client->GetBlockList().Value;
+ } catch (Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "GetBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Cannot write to a file without
first "
+ "fetching the existing block list.",
+ exception);
+ }
+}
+
+Azure::Storage::Metadata ArrowMetadataToAzureMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& arrow_metadata) {
+ Azure::Storage::Metadata azure_metadata;
+ for (auto key_value : arrow_metadata->sorted_pairs()) {
+ azure_metadata[key_value.first] = key_value.second;
+ }
+ return azure_metadata;
+}
+
+Status CommitBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client,
+ const std::vector<std::string>& block_ids, const Azure::Storage::Metadata&
metadata) {
+ Azure::Storage::Blobs::CommitBlockListOptions options;
+ options.Metadata = metadata;
+ try {
+ // CommitBlockList puts all block_ids in the latest element. That means in
the case of
+ // overlapping block_ids the newly staged block ids will always replace the
+ // previously committed blocks.
+ //
https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
+ block_blob_client->CommitBlockList(block_ids, options);
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "CommitBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Committing is required to flush
an "
+ "output/append stream.",
+ exception);
+ }
+ return Status::OK();
+}
+
+class ObjectAppendStream final : public io::OutputStream {
+ public:
+ ObjectAppendStream(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient>
block_blob_client,
+ const io::IOContext& io_context, const AzureLocation& location,
+ const std::shared_ptr<const KeyValueMetadata>& metadata,
+ const AzureOptions& options, int64_t size = kNoSize)
+ : block_blob_client_(std::move(block_blob_client)),
+ io_context_(io_context),
+ location_(location),
+ content_length_(size) {
+ if (metadata && metadata->size() != 0) {
+ metadata_ = ArrowMetadataToAzureMetadata(metadata);
+ } else if (options.default_metadata && options.default_metadata->size() !=
0) {
+ metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata);
+ }
+ }
+
+ ~ObjectAppendStream() override {
+ // For compliance with the rest of the IO stack, Close rather than Abort,
+ // even though it may be more expensive.
+ io::internal::CloseFromDestructor(this);
+ }
+
+ Status Init() {
+ if (content_length_ != kNoSize) {
+ DCHECK_GE(content_length_, 0);
+ pos_ = content_length_;
+ } else {
+ try {
+ auto properties = block_blob_client_->GetProperties();
+ content_length_ = properties.Value.BlobSize;
+ pos_ = content_length_;
+ } catch (const Azure::Storage::StorageException& exception) {
+ if (exception.StatusCode ==
Azure::Core::Http::HttpStatusCode::NotFound) {
+ RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_));
+ } else {
+ return internal::ExceptionToStatus(
+ "GetProperties failed for '" + block_blob_client_->GetUrl() +
+ "' with an unexpected Azure error. Can not initialise an "
+ "ObjectAppendStream without knowing whether a file already
exists at "
+ "this path, and if it exists, its size.",
+ exception);
+ }
+ content_length_ = 0;
+ }
+ }
+ if (content_length_ > 0) {
+ ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
+ for (auto block : block_list.CommittedBlocks) {
+ block_ids_.push_back(block.Name);
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Abort() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ Status Close() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ RETURN_NOT_OK(Flush());
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ bool closed() const override { return closed_; }
+
+ Status CheckClosed(const char* action) const {
+ if (closed_) {
+ return Status::Invalid("Cannot ", action, " on closed stream.");
+ }
+ return Status::OK();
+ }
+
+ Result<int64_t> Tell() const override {
+ RETURN_NOT_OK(CheckClosed("tell"));
+ return pos_;
+ }
+
+ Status Write(const std::shared_ptr<Buffer>& buffer) override {
+ return DoAppend(buffer->data(), buffer->size(), buffer);
+ }
+
+ Status Write(const void* data, int64_t nbytes) override {
+ return DoAppend(data, nbytes);
+ }
+
+ Status DoAppend(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ RETURN_NOT_OK(CheckClosed("append"));
+ auto append_data = reinterpret_cast<uint8_t*>((void*)data);
+ auto block_content = Azure::Core::IO::MemoryBodyStream(
+ append_data, strlen(reinterpret_cast<char*>(append_data)));
+ if (block_content.Length() == 0) {
+ return Status::OK();
+ }
+
+ auto size = block_ids_.size();
Review Comment:
Can we use more meaningful variable name?
```suggestion
const auto n_block_ids = block_ids_.size();
```
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -461,6 +463,224 @@ class ObjectInputFile final : public io::RandomAccessFile
{
int64_t content_length_ = kNoSize;
std::shared_ptr<const KeyValueMetadata> metadata_;
};
+
+Status CreateEmptyBlockBlob(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ std::string s = "";
+ try {
+ block_blob_client->UploadFrom(
+ const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(s.data())),
s.size());
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "UploadFrom failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. There is no existing blob at
this "
+ "location or the existing blob must be replaced so
ObjectAppendStream must "
+ "create a new empty block blob.",
+ exception);
+ }
+ return Status::OK();
+}
+
+Result<Azure::Storage::Blobs::Models::GetBlockListResult> GetBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ try {
+ return block_blob_client->GetBlockList().Value;
+ } catch (Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "GetBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Cannot write to a file without
first "
+ "fetching the existing block list.",
+ exception);
+ }
+}
+
+Azure::Storage::Metadata ArrowMetadataToAzureMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& arrow_metadata) {
+ Azure::Storage::Metadata azure_metadata;
+ for (auto key_value : arrow_metadata->sorted_pairs()) {
+ azure_metadata[key_value.first] = key_value.second;
+ }
+ return azure_metadata;
+}
+
+Status CommitBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client,
+ const std::vector<std::string>& block_ids, const Azure::Storage::Metadata&
metadata) {
+ Azure::Storage::Blobs::CommitBlockListOptions options;
+ options.Metadata = metadata;
+ try {
+ // CommitBlockList puts all block_ids in the latest element. That means in
the case of
+ // overlapping block_ids the newly staged block ids will always replace the
+ // previously committed blocks.
+ //
https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
+ block_blob_client->CommitBlockList(block_ids, options);
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "CommitBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Committing is required to flush
an "
+ "output/append stream.",
+ exception);
+ }
+ return Status::OK();
+}
+
+class ObjectAppendStream final : public io::OutputStream {
+ public:
+ ObjectAppendStream(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient>
block_blob_client,
+ const io::IOContext& io_context, const AzureLocation& location,
+ const std::shared_ptr<const KeyValueMetadata>& metadata,
+ const AzureOptions& options, int64_t size = kNoSize)
+ : block_blob_client_(std::move(block_blob_client)),
+ io_context_(io_context),
+ location_(location),
+ content_length_(size) {
+ if (metadata && metadata->size() != 0) {
+ metadata_ = ArrowMetadataToAzureMetadata(metadata);
+ } else if (options.default_metadata && options.default_metadata->size() !=
0) {
+ metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata);
+ }
+ }
+
+ ~ObjectAppendStream() override {
+ // For compliance with the rest of the IO stack, Close rather than Abort,
+ // even though it may be more expensive.
+ io::internal::CloseFromDestructor(this);
+ }
+
+ Status Init() {
+ if (content_length_ != kNoSize) {
+ DCHECK_GE(content_length_, 0);
+ pos_ = content_length_;
+ } else {
+ try {
+ auto properties = block_blob_client_->GetProperties();
+ content_length_ = properties.Value.BlobSize;
+ pos_ = content_length_;
+ } catch (const Azure::Storage::StorageException& exception) {
+ if (exception.StatusCode ==
Azure::Core::Http::HttpStatusCode::NotFound) {
+ RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_));
+ } else {
+ return internal::ExceptionToStatus(
+ "GetProperties failed for '" + block_blob_client_->GetUrl() +
+ "' with an unexpected Azure error. Can not initialise an "
+ "ObjectAppendStream without knowing whether a file already
exists at "
+ "this path, and if it exists, its size.",
+ exception);
+ }
+ content_length_ = 0;
+ }
+ }
+ if (content_length_ > 0) {
+ ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
+ for (auto block : block_list.CommittedBlocks) {
+ block_ids_.push_back(block.Name);
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Abort() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ Status Close() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ RETURN_NOT_OK(Flush());
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ bool closed() const override { return closed_; }
+
+ Status CheckClosed(const char* action) const {
+ if (closed_) {
+ return Status::Invalid("Cannot ", action, " on closed stream.");
+ }
+ return Status::OK();
+ }
+
+ Result<int64_t> Tell() const override {
+ RETURN_NOT_OK(CheckClosed("tell"));
+ return pos_;
+ }
+
+ Status Write(const std::shared_ptr<Buffer>& buffer) override {
+ return DoAppend(buffer->data(), buffer->size(), buffer);
+ }
+
+ Status Write(const void* data, int64_t nbytes) override {
+ return DoAppend(data, nbytes);
+ }
+
+ Status DoAppend(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ RETURN_NOT_OK(CheckClosed("append"));
+ auto append_data = reinterpret_cast<uint8_t*>((void*)data);
+ auto block_content = Azure::Core::IO::MemoryBodyStream(
+ append_data, strlen(reinterpret_cast<char*>(append_data)));
+ if (block_content.Length() == 0) {
+ return Status::OK();
+ }
+
+ auto size = block_ids_.size();
+
+ // New block ids must always be distinct from the existing block ids.
Otherwise we
Review Comment:
```suggestion
// New block ID must always be distinct from the existing block IDs.
Otherwise we
```
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -461,6 +463,224 @@ class ObjectInputFile final : public io::RandomAccessFile
{
int64_t content_length_ = kNoSize;
std::shared_ptr<const KeyValueMetadata> metadata_;
};
+
+Status CreateEmptyBlockBlob(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ std::string s = "";
+ try {
+ block_blob_client->UploadFrom(
+ const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(s.data())),
s.size());
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "UploadFrom failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. There is no existing blob at
this "
+ "location or the existing blob must be replaced so
ObjectAppendStream must "
+ "create a new empty block blob.",
+ exception);
+ }
+ return Status::OK();
+}
+
+Result<Azure::Storage::Blobs::Models::GetBlockListResult> GetBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ try {
+ return block_blob_client->GetBlockList().Value;
+ } catch (Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "GetBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Cannot write to a file without
first "
+ "fetching the existing block list.",
+ exception);
+ }
+}
+
+Azure::Storage::Metadata ArrowMetadataToAzureMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& arrow_metadata) {
+ Azure::Storage::Metadata azure_metadata;
+ for (auto key_value : arrow_metadata->sorted_pairs()) {
+ azure_metadata[key_value.first] = key_value.second;
+ }
+ return azure_metadata;
+}
+
+Status CommitBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client,
+ const std::vector<std::string>& block_ids, const Azure::Storage::Metadata&
metadata) {
+ Azure::Storage::Blobs::CommitBlockListOptions options;
+ options.Metadata = metadata;
+ try {
+ // CommitBlockList puts all block_ids in the latest element. That means in
the case of
+ // overlapping block_ids the newly staged block ids will always replace the
+ // previously committed blocks.
+ //
https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
+ block_blob_client->CommitBlockList(block_ids, options);
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "CommitBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Committing is required to flush
an "
+ "output/append stream.",
+ exception);
+ }
+ return Status::OK();
+}
+
+class ObjectAppendStream final : public io::OutputStream {
+ public:
+ ObjectAppendStream(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient>
block_blob_client,
+ const io::IOContext& io_context, const AzureLocation& location,
+ const std::shared_ptr<const KeyValueMetadata>& metadata,
+ const AzureOptions& options, int64_t size = kNoSize)
+ : block_blob_client_(std::move(block_blob_client)),
+ io_context_(io_context),
+ location_(location),
+ content_length_(size) {
+ if (metadata && metadata->size() != 0) {
+ metadata_ = ArrowMetadataToAzureMetadata(metadata);
+ } else if (options.default_metadata && options.default_metadata->size() !=
0) {
+ metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata);
+ }
+ }
+
+ ~ObjectAppendStream() override {
+ // For compliance with the rest of the IO stack, Close rather than Abort,
+ // even though it may be more expensive.
+ io::internal::CloseFromDestructor(this);
+ }
+
+ Status Init() {
+ if (content_length_ != kNoSize) {
+ DCHECK_GE(content_length_, 0);
+ pos_ = content_length_;
+ } else {
+ try {
+ auto properties = block_blob_client_->GetProperties();
+ content_length_ = properties.Value.BlobSize;
+ pos_ = content_length_;
+ } catch (const Azure::Storage::StorageException& exception) {
+ if (exception.StatusCode ==
Azure::Core::Http::HttpStatusCode::NotFound) {
+ RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_));
+ } else {
+ return internal::ExceptionToStatus(
+ "GetProperties failed for '" + block_blob_client_->GetUrl() +
+ "' with an unexpected Azure error. Can not initialise an "
+ "ObjectAppendStream without knowing whether a file already
exists at "
+ "this path, and if it exists, its size.",
+ exception);
+ }
+ content_length_ = 0;
+ }
+ }
+ if (content_length_ > 0) {
+ ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
+ for (auto block : block_list.CommittedBlocks) {
+ block_ids_.push_back(block.Name);
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Abort() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ Status Close() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ RETURN_NOT_OK(Flush());
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ bool closed() const override { return closed_; }
+
+ Status CheckClosed(const char* action) const {
+ if (closed_) {
+ return Status::Invalid("Cannot ", action, " on closed stream.");
+ }
+ return Status::OK();
+ }
+
+ Result<int64_t> Tell() const override {
+ RETURN_NOT_OK(CheckClosed("tell"));
+ return pos_;
+ }
+
+ Status Write(const std::shared_ptr<Buffer>& buffer) override {
+ return DoAppend(buffer->data(), buffer->size(), buffer);
+ }
+
+ Status Write(const void* data, int64_t nbytes) override {
+ return DoAppend(data, nbytes);
+ }
+
+ Status DoAppend(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ RETURN_NOT_OK(CheckClosed("append"));
+ auto append_data = reinterpret_cast<uint8_t*>((void*)data);
+ auto block_content = Azure::Core::IO::MemoryBodyStream(
+ append_data, strlen(reinterpret_cast<char*>(append_data)));
+ if (block_content.Length() == 0) {
+ return Status::OK();
+ }
+
+ auto size = block_ids_.size();
+
+ // New block ids must always be distinct from the existing block ids.
Otherwise we
+ // will accidentally replace the content of existing blocks, causing
corruption.
+ // We will use monotonically increasing integers.
+ std::string new_block_id = std::to_string(size);
+
+ // Pad to 5 digits, because Azure allows a maximum of 50,000 blocks.
+ const size_t target_number_of_digits = 5;
+ int required_padding_digits =
Review Comment:
```suggestion
const auto required_padding_digits =
```
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -461,6 +463,224 @@ class ObjectInputFile final : public io::RandomAccessFile
{
int64_t content_length_ = kNoSize;
std::shared_ptr<const KeyValueMetadata> metadata_;
};
+
+Status CreateEmptyBlockBlob(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ std::string s = "";
+ try {
+ block_blob_client->UploadFrom(
+ const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(s.data())),
s.size());
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "UploadFrom failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. There is no existing blob at
this "
+ "location or the existing blob must be replaced so
ObjectAppendStream must "
+ "create a new empty block blob.",
+ exception);
+ }
+ return Status::OK();
+}
+
+Result<Azure::Storage::Blobs::Models::GetBlockListResult> GetBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ try {
+ return block_blob_client->GetBlockList().Value;
+ } catch (Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "GetBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Cannot write to a file without
first "
+ "fetching the existing block list.",
+ exception);
+ }
+}
+
+Azure::Storage::Metadata ArrowMetadataToAzureMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& arrow_metadata) {
+ Azure::Storage::Metadata azure_metadata;
+ for (auto key_value : arrow_metadata->sorted_pairs()) {
+ azure_metadata[key_value.first] = key_value.second;
+ }
+ return azure_metadata;
+}
+
+Status CommitBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client,
+ const std::vector<std::string>& block_ids, const Azure::Storage::Metadata&
metadata) {
+ Azure::Storage::Blobs::CommitBlockListOptions options;
+ options.Metadata = metadata;
+ try {
+ // CommitBlockList puts all block_ids in the latest element. That means in
the case of
+ // overlapping block_ids the newly staged block ids will always replace the
+ // previously committed blocks.
+ //
https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
+ block_blob_client->CommitBlockList(block_ids, options);
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "CommitBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Committing is required to flush
an "
+ "output/append stream.",
+ exception);
+ }
+ return Status::OK();
+}
+
+class ObjectAppendStream final : public io::OutputStream {
+ public:
+ ObjectAppendStream(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient>
block_blob_client,
+ const io::IOContext& io_context, const AzureLocation& location,
+ const std::shared_ptr<const KeyValueMetadata>& metadata,
+ const AzureOptions& options, int64_t size = kNoSize)
+ : block_blob_client_(std::move(block_blob_client)),
+ io_context_(io_context),
+ location_(location),
+ content_length_(size) {
+ if (metadata && metadata->size() != 0) {
+ metadata_ = ArrowMetadataToAzureMetadata(metadata);
+ } else if (options.default_metadata && options.default_metadata->size() !=
0) {
+ metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata);
+ }
+ }
+
+ ~ObjectAppendStream() override {
+ // For compliance with the rest of the IO stack, Close rather than Abort,
+ // even though it may be more expensive.
+ io::internal::CloseFromDestructor(this);
+ }
+
+ Status Init() {
+ if (content_length_ != kNoSize) {
+ DCHECK_GE(content_length_, 0);
+ pos_ = content_length_;
+ } else {
+ try {
+ auto properties = block_blob_client_->GetProperties();
+ content_length_ = properties.Value.BlobSize;
+ pos_ = content_length_;
+ } catch (const Azure::Storage::StorageException& exception) {
+ if (exception.StatusCode ==
Azure::Core::Http::HttpStatusCode::NotFound) {
+ RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_));
+ } else {
+ return internal::ExceptionToStatus(
+ "GetProperties failed for '" + block_blob_client_->GetUrl() +
+ "' with an unexpected Azure error. Can not initialise an "
+ "ObjectAppendStream without knowing whether a file already
exists at "
+ "this path, and if it exists, its size.",
+ exception);
+ }
+ content_length_ = 0;
+ }
+ }
+ if (content_length_ > 0) {
+ ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
+ for (auto block : block_list.CommittedBlocks) {
+ block_ids_.push_back(block.Name);
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Abort() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ Status Close() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ RETURN_NOT_OK(Flush());
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ bool closed() const override { return closed_; }
+
+ Status CheckClosed(const char* action) const {
+ if (closed_) {
+ return Status::Invalid("Cannot ", action, " on closed stream.");
+ }
+ return Status::OK();
+ }
+
+ Result<int64_t> Tell() const override {
+ RETURN_NOT_OK(CheckClosed("tell"));
+ return pos_;
+ }
+
+ Status Write(const std::shared_ptr<Buffer>& buffer) override {
+ return DoAppend(buffer->data(), buffer->size(), buffer);
+ }
+
+ Status Write(const void* data, int64_t nbytes) override {
+ return DoAppend(data, nbytes);
+ }
+
+ Status DoAppend(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ RETURN_NOT_OK(CheckClosed("append"));
+ auto append_data = reinterpret_cast<uint8_t*>((void*)data);
+ auto block_content = Azure::Core::IO::MemoryBodyStream(
+ append_data, strlen(reinterpret_cast<char*>(append_data)));
+ if (block_content.Length() == 0) {
+ return Status::OK();
+ }
+
+ auto size = block_ids_.size();
+
+ // New block ids must always be distinct from the existing block ids.
Otherwise we
+ // will accidentally replace the content of existing blocks, causing
corruption.
+ // We will use monotonically increasing integers.
+ std::string new_block_id = std::to_string(size);
+
+ // Pad to 5 digits, because Azure allows a maximum of 50,000 blocks.
+ const size_t target_number_of_digits = 5;
+ int required_padding_digits =
+ target_number_of_digits - std::min(target_number_of_digits,
new_block_id.size());
+ new_block_id.insert(0, required_padding_digits, '0');
+ new_block_id += "-arrow"; // Add a suffix to reduce risk of block_id
collisions with
+ // blocks created by other applications.
+ new_block_id = Azure::Core::Convert::Base64Encode(
+ std::vector<uint8_t>(new_block_id.begin(), new_block_id.end()));
+
Review Comment:
Do we need to check whether the `new_block_id` exists in `block_ids_` or not?
##########
cpp/src/arrow/filesystem/azurefs.cc:
##########
@@ -461,6 +463,224 @@ class ObjectInputFile final : public io::RandomAccessFile
{
int64_t content_length_ = kNoSize;
std::shared_ptr<const KeyValueMetadata> metadata_;
};
+
+Status CreateEmptyBlockBlob(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ std::string s = "";
+ try {
+ block_blob_client->UploadFrom(
+ const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(s.data())),
s.size());
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "UploadFrom failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. There is no existing blob at
this "
+ "location or the existing blob must be replaced so
ObjectAppendStream must "
+ "create a new empty block blob.",
+ exception);
+ }
+ return Status::OK();
+}
+
+Result<Azure::Storage::Blobs::Models::GetBlockListResult> GetBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client)
{
+ try {
+ return block_blob_client->GetBlockList().Value;
+ } catch (Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "GetBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Cannot write to a file without
first "
+ "fetching the existing block list.",
+ exception);
+ }
+}
+
+Azure::Storage::Metadata ArrowMetadataToAzureMetadata(
+ const std::shared_ptr<const KeyValueMetadata>& arrow_metadata) {
+ Azure::Storage::Metadata azure_metadata;
+ for (auto key_value : arrow_metadata->sorted_pairs()) {
+ azure_metadata[key_value.first] = key_value.second;
+ }
+ return azure_metadata;
+}
+
+Status CommitBlockList(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient> block_blob_client,
+ const std::vector<std::string>& block_ids, const Azure::Storage::Metadata&
metadata) {
+ Azure::Storage::Blobs::CommitBlockListOptions options;
+ options.Metadata = metadata;
+ try {
+ // CommitBlockList puts all block_ids in the latest element. That means in
the case of
+ // overlapping block_ids the newly staged block ids will always replace the
+ // previously committed blocks.
+ //
https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
+ block_blob_client->CommitBlockList(block_ids, options);
+ } catch (const Azure::Storage::StorageException& exception) {
+ return internal::ExceptionToStatus(
+ "CommitBlockList failed for '" + block_blob_client->GetUrl() +
+ "' with an unexpected Azure error. Committing is required to flush
an "
+ "output/append stream.",
+ exception);
+ }
+ return Status::OK();
+}
+
+class ObjectAppendStream final : public io::OutputStream {
+ public:
+ ObjectAppendStream(
+ std::shared_ptr<Azure::Storage::Blobs::BlockBlobClient>
block_blob_client,
+ const io::IOContext& io_context, const AzureLocation& location,
+ const std::shared_ptr<const KeyValueMetadata>& metadata,
+ const AzureOptions& options, int64_t size = kNoSize)
+ : block_blob_client_(std::move(block_blob_client)),
+ io_context_(io_context),
+ location_(location),
+ content_length_(size) {
+ if (metadata && metadata->size() != 0) {
+ metadata_ = ArrowMetadataToAzureMetadata(metadata);
+ } else if (options.default_metadata && options.default_metadata->size() !=
0) {
+ metadata_ = ArrowMetadataToAzureMetadata(options.default_metadata);
+ }
+ }
+
+ ~ObjectAppendStream() override {
+ // For compliance with the rest of the IO stack, Close rather than Abort,
+ // even though it may be more expensive.
+ io::internal::CloseFromDestructor(this);
+ }
+
+ Status Init() {
+ if (content_length_ != kNoSize) {
+ DCHECK_GE(content_length_, 0);
+ pos_ = content_length_;
+ } else {
+ try {
+ auto properties = block_blob_client_->GetProperties();
+ content_length_ = properties.Value.BlobSize;
+ pos_ = content_length_;
+ } catch (const Azure::Storage::StorageException& exception) {
+ if (exception.StatusCode ==
Azure::Core::Http::HttpStatusCode::NotFound) {
+ RETURN_NOT_OK(CreateEmptyBlockBlob(block_blob_client_));
+ } else {
+ return internal::ExceptionToStatus(
+ "GetProperties failed for '" + block_blob_client_->GetUrl() +
+ "' with an unexpected Azure error. Can not initialise an "
+ "ObjectAppendStream without knowing whether a file already
exists at "
+ "this path, and if it exists, its size.",
+ exception);
+ }
+ content_length_ = 0;
+ }
+ }
+ if (content_length_ > 0) {
+ ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
+ for (auto block : block_list.CommittedBlocks) {
+ block_ids_.push_back(block.Name);
+ }
+ }
+ return Status::OK();
+ }
+
+ Status Abort() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ Status Close() override {
+ if (closed_) {
+ return Status::OK();
+ }
+ RETURN_NOT_OK(Flush());
+ block_blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ bool closed() const override { return closed_; }
+
+ Status CheckClosed(const char* action) const {
+ if (closed_) {
+ return Status::Invalid("Cannot ", action, " on closed stream.");
+ }
+ return Status::OK();
+ }
+
+ Result<int64_t> Tell() const override {
+ RETURN_NOT_OK(CheckClosed("tell"));
+ return pos_;
+ }
+
+ Status Write(const std::shared_ptr<Buffer>& buffer) override {
+ return DoAppend(buffer->data(), buffer->size(), buffer);
+ }
+
+ Status Write(const void* data, int64_t nbytes) override {
+ return DoAppend(data, nbytes);
+ }
+
+ Status DoAppend(const void* data, int64_t nbytes,
+ std::shared_ptr<Buffer> owned_buffer = nullptr) {
+ RETURN_NOT_OK(CheckClosed("append"));
+ auto append_data = reinterpret_cast<uint8_t*>((void*)data);
Review Comment:
Can we use `const uint8_t*` here?
```suggestion
auto append_data = reinterpret_cast<const uint8_t*>(data);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]