This is an automated email from the ASF dual-hosted git repository.
bkietz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 23dfd0e864 GH-37511: [C++] Implement file reads for Azure filesystem
(#38269)
23dfd0e864 is described below
commit 23dfd0e8643799b803b286e88ca6253303ecb703
Author: Thomas Newton <[email protected]>
AuthorDate: Thu Oct 19 13:50:35 2023 +0100
GH-37511: [C++] Implement file reads for Azure filesystem (#38269)
### Rationale for this change
We want a C++ implementation of an Azure filesystem. Reading files is the
first step.
### What changes are included in this PR?
Adds an implementation of `io::RandomAccessFile` for Azure blob storage
(with or without hierarchical namespace (HNS) a.k.a datalake gen 2). This is
largely copied from https://github.com/apache/arrow/pull/12914. Using this
`io::RandomAccessFile` implementation we implement the input file and stream
methods of the `AzureFileSystem`.
I've made a few changes to the implementation from
https://github.com/apache/arrow/pull/12914. The biggest one is removing use of
the Azure SDK datalake APIs. These APIs cannot be tested with `azurite`, they
are only beneficial for listing operations on HNS enabled accounts and
detecting a HNS enabled account is quite difficult (unless you use
significantly elevated Azure permissions). Adding 2 different code paths for
normal blob storage and datalake gen 2 seems like a bad idea to me [...]
### Are these changes tested?
Yes. The tests are all based on the tests from the GCS filesystem with
minimal chantges. I remember reading a review comment on
https://github.com/apache/arrow/pull/12914 which recommended this approach.
There are a few places where the GCS tests relied on file writes or file
info methods so I've replaced those with direct calls to the Azure blob client
and left TODO comments saying to switch them to use the AzureFilesystem when
the relevant methods are implemented.
### Are there any user-facing changes?
Yes. File reads using the Azure filesystem are now supported.
* Closes: #37511
Lead-authored-by: Thomas Newton <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Signed-off-by: Benjamin Kietzman <[email protected]>
---
cpp/src/arrow/filesystem/azurefs.cc | 335 ++++++++++++++++++++++++++--
cpp/src/arrow/filesystem/azurefs.h | 3 +
cpp/src/arrow/filesystem/azurefs_test.cc | 371 ++++++++++++++++++++++++++++---
3 files changed, 658 insertions(+), 51 deletions(-)
diff --git a/cpp/src/arrow/filesystem/azurefs.cc
b/cpp/src/arrow/filesystem/azurefs.cc
index fcbae332d2..179be069b2 100644
--- a/cpp/src/arrow/filesystem/azurefs.cc
+++ b/cpp/src/arrow/filesystem/azurefs.cc
@@ -17,11 +17,17 @@
#include "arrow/filesystem/azurefs.h"
-#include <azure/identity/default_azure_credential.hpp>
#include <azure/storage/blobs.hpp>
+#include "arrow/buffer.h"
+#include "arrow/filesystem/path_util.h"
+#include "arrow/filesystem/util_internal.h"
#include "arrow/result.h"
#include "arrow/util/checked_cast.h"
+#include "arrow/util/future.h"
+#include "arrow/util/key_value_metadata.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/string.h"
namespace arrow {
namespace fs {
@@ -37,34 +43,329 @@ bool AzureOptions::Equals(const AzureOptions& other) const
{
credentials_kind == other.credentials_kind);
}
+Status AzureOptions::ConfigureAccountKeyCredentials(const std::string&
account_name,
+ const std::string&
account_key) {
+ if (this->backend == AzureBackend::Azurite) {
+ account_blob_url = "http://127.0.0.1:10000/" + account_name + "/";
+ account_dfs_url = "http://127.0.0.1:10000/" + account_name + "/";
+ } else {
+ account_dfs_url = "https://" + account_name + ".dfs.core.windows.net/";
+ account_blob_url = "https://" + account_name + ".blob.core.windows.net/";
+ }
+ storage_credentials_provider =
+
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(account_name,
+
account_key);
+ credentials_kind = AzureCredentialsKind::StorageCredentials;
+ return Status::OK();
+}
+namespace {
+
+// An AzureFileSystem represents a single Azure storage account. AzurePath
describes a
+// container and path within that storage account.
+struct AzurePath {
+ std::string full_path;
+ std::string container;
+ std::string path_to_file;
+ std::vector<std::string> path_to_file_parts;
+
+ static Result<AzurePath> FromString(const std::string& s) {
+ // Example expected string format: testcontainer/testdir/testfile.txt
+ // container = testcontainer
+ // path_to_file = testdir/testfile.txt
+ // path_to_file_parts = [testdir, testfile.txt]
+ if (internal::IsLikelyUri(s)) {
+ return Status::Invalid(
+ "Expected an Azure object path of the form 'container/path...', got
a URI: '",
+ s, "'");
+ }
+ const auto src = internal::RemoveTrailingSlash(s);
+ auto first_sep = src.find_first_of(internal::kSep);
+ if (first_sep == 0) {
+ return Status::Invalid("Path cannot start with a separator ('", s, "')");
+ }
+ if (first_sep == std::string::npos) {
+ return AzurePath{std::string(src), std::string(src), "", {}};
+ }
+ AzurePath path;
+ path.full_path = std::string(src);
+ path.container = std::string(src.substr(0, first_sep));
+ path.path_to_file = std::string(src.substr(first_sep + 1));
+ path.path_to_file_parts = internal::SplitAbstractPath(path.path_to_file);
+ RETURN_NOT_OK(Validate(path));
+ return path;
+ }
+
+ static Status Validate(const AzurePath& path) {
+ auto status = internal::ValidateAbstractPathParts(path.path_to_file_parts);
+ if (!status.ok()) {
+ return Status::Invalid(status.message(), " in path ", path.full_path);
+ } else {
+ return status;
+ }
+ }
+
+ AzurePath parent() const {
+ DCHECK(has_parent());
+ auto parent = AzurePath{"", container, "", path_to_file_parts};
+ parent.path_to_file_parts.pop_back();
+ parent.path_to_file =
internal::JoinAbstractPath(parent.path_to_file_parts);
+ if (parent.path_to_file.empty()) {
+ parent.full_path = parent.container;
+ } else {
+ parent.full_path = parent.container + internal::kSep +
parent.path_to_file;
+ }
+ return parent;
+ }
+
+ bool has_parent() const { return !path_to_file.empty(); }
+
+ bool empty() const { return container.empty() && path_to_file.empty(); }
+
+ bool operator==(const AzurePath& other) const {
+ return container == other.container && path_to_file == other.path_to_file;
+ }
+};
+
+Status PathNotFound(const AzurePath& path) {
+ return ::arrow::fs::internal::PathNotFound(path.full_path);
+}
+
+Status NotAFile(const AzurePath& path) {
+ return ::arrow::fs::internal::NotAFile(path.full_path);
+}
+
+Status ValidateFilePath(const AzurePath& path) {
+ if (path.container.empty()) {
+ return PathNotFound(path);
+ }
+
+ if (path.path_to_file.empty()) {
+ return NotAFile(path);
+ }
+ return Status::OK();
+}
+
+Status ErrorToStatus(const std::string& prefix,
+ const Azure::Storage::StorageException& exception) {
+ return Status::IOError(prefix, " Azure Error: ", exception.what());
+}
+
+template <typename ObjectResult>
+std::shared_ptr<const KeyValueMetadata> GetObjectMetadata(const ObjectResult&
result) {
+ auto md = std::make_shared<KeyValueMetadata>();
+ for (auto prop : result) {
+ md->Append(prop.first, prop.second);
+ }
+ return md;
+}
+
+class ObjectInputFile final : public io::RandomAccessFile {
+ public:
+ ObjectInputFile(std::shared_ptr<Azure::Storage::Blobs::BlobClient>
blob_client,
+ const io::IOContext& io_context, AzurePath path, int64_t
size = kNoSize)
+ : blob_client_(std::move(blob_client)),
+ io_context_(io_context),
+ path_(std::move(path)),
+ content_length_(size) {}
+
+ Status Init() {
+ if (content_length_ != kNoSize) {
+ DCHECK_GE(content_length_, 0);
+ return Status::OK();
+ }
+ try {
+ auto properties = blob_client_->GetProperties();
+ content_length_ = properties.Value.BlobSize;
+ metadata_ = GetObjectMetadata(properties.Value.Metadata);
+ return Status::OK();
+ } catch (const Azure::Storage::StorageException& exception) {
+ if (exception.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound)
{
+ // Could be either container or blob not found.
+ return PathNotFound(path_);
+ }
+ return ErrorToStatus(
+ "When fetching properties for '" + blob_client_->GetUrl() + "': ",
exception);
+ }
+ }
+
+ Status CheckClosed(const char* action) const {
+ if (closed_) {
+ return Status::Invalid("Cannot ", action, " on closed file.");
+ }
+ return Status::OK();
+ }
+
+ Status CheckPosition(int64_t position, const char* action) const {
+ DCHECK_GE(content_length_, 0);
+ if (position < 0) {
+ return Status::Invalid("Cannot ", action, " from negative position");
+ }
+ if (position > content_length_) {
+ return Status::IOError("Cannot ", action, " past end of file");
+ }
+ return Status::OK();
+ }
+
+ // RandomAccessFile APIs
+
+ Result<std::shared_ptr<const KeyValueMetadata>> ReadMetadata() override {
+ return metadata_;
+ }
+
+ Future<std::shared_ptr<const KeyValueMetadata>> ReadMetadataAsync(
+ const io::IOContext& io_context) override {
+ return metadata_;
+ }
+
+ Status Close() override {
+ blob_client_ = nullptr;
+ closed_ = true;
+ return Status::OK();
+ }
+
+ bool closed() const override { return closed_; }
+
+ Result<int64_t> Tell() const override {
+ RETURN_NOT_OK(CheckClosed("tell"));
+ return pos_;
+ }
+
+ Result<int64_t> GetSize() override {
+ RETURN_NOT_OK(CheckClosed("size"));
+ return content_length_;
+ }
+
+ Status Seek(int64_t position) override {
+ RETURN_NOT_OK(CheckClosed("seek"));
+ RETURN_NOT_OK(CheckPosition(position, "seek"));
+
+ pos_ = position;
+ return Status::OK();
+ }
+
+ Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override
{
+ RETURN_NOT_OK(CheckClosed("read"));
+ RETURN_NOT_OK(CheckPosition(position, "read"));
+
+ nbytes = std::min(nbytes, content_length_ - position);
+ if (nbytes == 0) {
+ return 0;
+ }
+
+ // Read the desired range of bytes
+ Azure::Core::Http::HttpRange range{position, nbytes};
+ Azure::Storage::Blobs::DownloadBlobToOptions download_options;
+ download_options.Range = range;
+ try {
+ return blob_client_
+ ->DownloadTo(reinterpret_cast<uint8_t*>(out), nbytes,
download_options)
+ .Value.ContentRange.Length.Value();
+ } catch (const Azure::Storage::StorageException& exception) {
+ return ErrorToStatus("When reading from '" + blob_client_->GetUrl() +
+ "' at position " + std::to_string(position) + "
for " +
+ std::to_string(nbytes) + " bytes: ",
+ exception);
+ }
+ }
+
+ Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes)
override {
+ RETURN_NOT_OK(CheckClosed("read"));
+ RETURN_NOT_OK(CheckPosition(position, "read"));
+
+ // No need to allocate more than the remaining number of bytes
+ nbytes = std::min(nbytes, content_length_ - position);
+
+ ARROW_ASSIGN_OR_RAISE(auto buffer,
+ AllocateResizableBuffer(nbytes, io_context_.pool()));
+ if (nbytes > 0) {
+ ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
+ ReadAt(position, nbytes, buffer->mutable_data()));
+ DCHECK_LE(bytes_read, nbytes);
+ RETURN_NOT_OK(buffer->Resize(bytes_read));
+ }
+ return std::move(buffer);
+ }
+
+ Result<int64_t> Read(int64_t nbytes, void* out) override {
+ ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out));
+ pos_ += bytes_read;
+ return bytes_read;
+ }
+
+ Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override {
+ ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes));
+ pos_ += buffer->size();
+ return std::move(buffer);
+ }
+
+ private:
+ std::shared_ptr<Azure::Storage::Blobs::BlobClient> blob_client_;
+ const io::IOContext io_context_;
+ AzurePath path_;
+
+ bool closed_ = false;
+ int64_t pos_ = 0;
+ int64_t content_length_ = kNoSize;
+ std::shared_ptr<const KeyValueMetadata> metadata_;
+};
+
+} // namespace
+
// -----------------------------------------------------------------------
// AzureFilesystem Implementation
class AzureFileSystem::Impl {
public:
io::IOContext io_context_;
- bool is_hierarchical_namespace_enabled_;
+ std::shared_ptr<Azure::Storage::Blobs::BlobServiceClient> service_client_;
AzureOptions options_;
explicit Impl(AzureOptions options, io::IOContext io_context)
: io_context_(io_context), options_(std::move(options)) {}
Status Init() {
- // TODO: GH-18014 Delete this once we have a proper implementation. This
just
- // initializes a pointless Azure blob service client with a fake endpoint
to ensure
- // the build will fail if the Azure SDK build is broken.
- auto default_credential =
std::make_shared<Azure::Identity::DefaultAzureCredential>();
- auto service_client = Azure::Storage::Blobs::BlobServiceClient(
- "http://fake-blob-storage-endpoint", default_credential);
- if (options_.backend == AzureBackend::Azurite) {
- // gen1Client_->GetAccountInfo().Value.IsHierarchicalNamespaceEnabled
- // throws error in azurite
- is_hierarchical_namespace_enabled_ = false;
- }
+ service_client_ =
std::make_shared<Azure::Storage::Blobs::BlobServiceClient>(
+ options_.account_blob_url, options_.storage_credentials_provider);
return Status::OK();
}
const AzureOptions& options() const { return options_; }
+
+ Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const std::string& s,
+ AzureFileSystem* fs) {
+ ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(s));
+ ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(s));
+ RETURN_NOT_OK(ValidateFilePath(path));
+ auto blob_client = std::make_shared<Azure::Storage::Blobs::BlobClient>(
+ service_client_->GetBlobContainerClient(path.container)
+ .GetBlobClient(path.path_to_file));
+
+ auto ptr =
+ std::make_shared<ObjectInputFile>(blob_client, fs->io_context(),
std::move(path));
+ RETURN_NOT_OK(ptr->Init());
+ return ptr;
+ }
+
+ Result<std::shared_ptr<ObjectInputFile>> OpenInputFile(const FileInfo& info,
+ AzureFileSystem* fs) {
+ ARROW_RETURN_NOT_OK(internal::AssertNoTrailingSlash(info.path()));
+ if (info.type() == FileType::NotFound) {
+ return ::arrow::fs::internal::PathNotFound(info.path());
+ }
+ if (info.type() != FileType::File && info.type() != FileType::Unknown) {
+ return ::arrow::fs::internal::NotAFile(info.path());
+ }
+ ARROW_ASSIGN_OR_RAISE(auto path, AzurePath::FromString(info.path()));
+ RETURN_NOT_OK(ValidateFilePath(path));
+ auto blob_client = std::make_shared<Azure::Storage::Blobs::BlobClient>(
+ service_client_->GetBlobContainerClient(path.container)
+ .GetBlobClient(path.path_to_file));
+
+ auto ptr = std::make_shared<ObjectInputFile>(blob_client, fs->io_context(),
+ std::move(path), info.size());
+ RETURN_NOT_OK(ptr->Init());
+ return ptr;
+ }
};
const AzureOptions& AzureFileSystem::options() const { return
impl_->options(); }
@@ -118,22 +419,22 @@ Status AzureFileSystem::CopyFile(const std::string& src,
const std::string& dest
Result<std::shared_ptr<io::InputStream>> AzureFileSystem::OpenInputStream(
const std::string& path) {
- return Status::NotImplemented("The Azure FileSystem is not fully
implemented");
+ return impl_->OpenInputFile(path, this);
}
Result<std::shared_ptr<io::InputStream>> AzureFileSystem::OpenInputStream(
const FileInfo& info) {
- return Status::NotImplemented("The Azure FileSystem is not fully
implemented");
+ return impl_->OpenInputFile(info, this);
}
Result<std::shared_ptr<io::RandomAccessFile>> AzureFileSystem::OpenInputFile(
const std::string& path) {
- return Status::NotImplemented("The Azure FileSystem is not fully
implemented");
+ return impl_->OpenInputFile(path, this);
}
Result<std::shared_ptr<io::RandomAccessFile>> AzureFileSystem::OpenInputFile(
const FileInfo& info) {
- return Status::NotImplemented("The Azure FileSystem is not fully
implemented");
+ return impl_->OpenInputFile(info, this);
}
Result<std::shared_ptr<io::OutputStream>> AzureFileSystem::OpenOutputStream(
diff --git a/cpp/src/arrow/filesystem/azurefs.h
b/cpp/src/arrow/filesystem/azurefs.h
index e5af4d23aa..1f7047ff94 100644
--- a/cpp/src/arrow/filesystem/azurefs.h
+++ b/cpp/src/arrow/filesystem/azurefs.h
@@ -79,6 +79,9 @@ struct ARROW_EXPORT AzureOptions {
AzureOptions();
+ Status ConfigureAccountKeyCredentials(const std::string& account_name,
+ const std::string& account_key);
+
bool Equals(const AzureOptions& other) const;
};
diff --git a/cpp/src/arrow/filesystem/azurefs_test.cc
b/cpp/src/arrow/filesystem/azurefs_test.cc
index 9bf7cb8e75..5d454bdc33 100644
--- a/cpp/src/arrow/filesystem/azurefs_test.cc
+++ b/cpp/src/arrow/filesystem/azurefs_test.cc
@@ -34,23 +34,24 @@
#include <boost/process.hpp>
#include "arrow/filesystem/azurefs.h"
-#include "arrow/util/io_util.h"
+
+#include <random>
+#include <string>
#include <gmock/gmock-matchers.h>
#include <gmock/gmock-more-matchers.h>
#include <gtest/gtest.h>
-
-#include <string>
-
-#include "arrow/testing/gtest_util.h"
-#include "arrow/testing/util.h"
-
#include <azure/identity/client_secret_credential.hpp>
#include <azure/identity/default_azure_credential.hpp>
#include <azure/identity/managed_identity_credential.hpp>
#include <azure/storage/blobs.hpp>
#include <azure/storage/common/storage_credential.hpp>
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/util.h"
+#include "arrow/util/io_util.h"
+#include "arrow/util/key_value_metadata.h"
+
namespace arrow {
using internal::TemporaryDir;
namespace fs {
@@ -61,6 +62,15 @@ using ::testing::IsEmpty;
using ::testing::Not;
using ::testing::NotNull;
+auto const* kLoremIpsum = R"""(
+Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor
+incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis
+nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
+Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu
+fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in
+culpa qui officia deserunt mollit anim id est laborum.
+)""";
+
class AzuriteEnv : public ::testing::Environment {
public:
AzuriteEnv() {
@@ -113,33 +123,6 @@ AzuriteEnv* GetAzuriteEnv() {
// Placeholder tests
// TODO: GH-18014 Remove once a proper test is added
-TEST(AzureFileSystem, UploadThenDownload) {
- const std::string container_name = "sample-container";
- const std::string blob_name = "sample-blob.txt";
- const std::string blob_content = "Hello Azure!";
-
- const std::string& account_name = GetAzuriteEnv()->account_name();
- const std::string& account_key = GetAzuriteEnv()->account_key();
-
- auto credential =
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(
- account_name, account_key);
-
- auto service_client = Azure::Storage::Blobs::BlobServiceClient(
- std::string("http://127.0.0.1:10000/") + account_name, credential);
- auto container_client =
service_client.GetBlobContainerClient(container_name);
- container_client.CreateIfNotExists();
- auto blob_client = container_client.GetBlockBlobClient(blob_name);
-
- std::vector<uint8_t> buffer(blob_content.begin(), blob_content.end());
- blob_client.UploadFrom(buffer.data(), buffer.size());
-
- std::vector<uint8_t> downloaded_content(blob_content.size());
- blob_client.DownloadTo(downloaded_content.data(), downloaded_content.size());
-
- EXPECT_EQ(std::string(downloaded_content.begin(), downloaded_content.end()),
- blob_content);
-}
-
TEST(AzureFileSystem, InitializeCredentials) {
auto default_credential =
std::make_shared<Azure::Identity::DefaultAzureCredential>();
auto managed_identity_credential =
@@ -154,6 +137,326 @@ TEST(AzureFileSystem, OptionsCompare) {
EXPECT_TRUE(options.Equals(options));
}
+class TestAzureFileSystem : public ::testing::Test {
+ public:
+ std::shared_ptr<FileSystem> fs_;
+ std::shared_ptr<Azure::Storage::Blobs::BlobServiceClient> service_client_;
+ std::mt19937_64 generator_;
+ std::string container_name_;
+
+ TestAzureFileSystem() : generator_(std::random_device()()) {}
+
+ AzureOptions MakeOptions() {
+ const std::string& account_name = GetAzuriteEnv()->account_name();
+ const std::string& account_key = GetAzuriteEnv()->account_key();
+ AzureOptions options;
+ options.backend = AzureBackend::Azurite;
+ ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(account_name,
account_key));
+ return options;
+ }
+
+ void SetUp() override {
+ ASSERT_THAT(GetAzuriteEnv(), NotNull());
+ ASSERT_OK(GetAzuriteEnv()->status());
+
+ container_name_ = RandomChars(32);
+ auto options = MakeOptions();
+ service_client_ =
std::make_shared<Azure::Storage::Blobs::BlobServiceClient>(
+ options.account_blob_url, options.storage_credentials_provider);
+ ASSERT_OK_AND_ASSIGN(fs_, AzureFileSystem::Make(options));
+ auto container_client =
service_client_->GetBlobContainerClient(container_name_);
+ container_client.CreateIfNotExists();
+
+ auto blob_client =
container_client.GetBlockBlobClient(PreexistingObjectName());
+ blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(kLoremIpsum),
+ strlen(kLoremIpsum));
+ }
+
+ void TearDown() override {
+ auto containers = service_client_->ListBlobContainers();
+ for (auto container : containers.BlobContainers) {
+ auto container_client =
service_client_->GetBlobContainerClient(container.Name);
+ container_client.DeleteIfExists();
+ }
+ }
+
+ std::string PreexistingContainerName() const { return container_name_; }
+
+ std::string PreexistingContainerPath() const {
+ return PreexistingContainerName() + '/';
+ }
+
+ static std::string PreexistingObjectName() { return "test-object-name"; }
+
+ std::string PreexistingObjectPath() const {
+ return PreexistingContainerPath() + PreexistingObjectName();
+ }
+
+ std::string NotFoundObjectPath() { return PreexistingContainerPath() +
"not-found"; }
+
+ std::string RandomLine(int lineno, std::size_t width) {
+ auto line = std::to_string(lineno) + ": ";
+ line += RandomChars(width - line.size() - 1);
+ line += '\n';
+ return line;
+ }
+
+ std::size_t RandomIndex(std::size_t end) {
+ return std::uniform_int_distribution<std::size_t>(0, end - 1)(generator_);
+ }
+
+ std::string RandomChars(std::size_t count) {
+ auto const fillers = std::string("abcdefghijlkmnopqrstuvwxyz0123456789");
+ std::uniform_int_distribution<std::size_t> d(0, fillers.size() - 1);
+ std::string s;
+ std::generate_n(std::back_inserter(s), count, [&] { return
fillers[d(generator_)]; });
+ return s;
+ }
+
+ void UploadLines(const std::vector<std::string>& lines, const char*
path_to_file,
+ int total_size) {
+ // TODO(GH-38333): Switch to using Azure filesystem to write once its
implemented.
+ auto blob_client =
service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlockBlobClient(path_to_file);
+ std::string all_lines = std::accumulate(lines.begin(), lines.end(),
std::string(""));
+ blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(all_lines.data()),
+ total_size);
+ }
+};
+
+TEST_F(TestAzureFileSystem, OpenInputStreamString) {
+ std::shared_ptr<io::InputStream> stream;
+ ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
+
+ ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024));
+ EXPECT_EQ(buffer->ToString(), kLoremIpsum);
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamStringBuffers) {
+ std::shared_ptr<io::InputStream> stream;
+ ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
+
+ std::string contents;
+ std::shared_ptr<Buffer> buffer;
+ do {
+ ASSERT_OK_AND_ASSIGN(buffer, stream->Read(16));
+ contents.append(buffer->ToString());
+ } while (buffer && buffer->size() != 0);
+
+ EXPECT_EQ(contents, kLoremIpsum);
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamInfo) {
+ // TODO(GH-38335): When implemented use ASSERT_OK_AND_ASSIGN(info,
+ // fs->GetFileInfo(PreexistingObjectPath()));
+ arrow::fs::FileInfo info(PreexistingObjectPath(), FileType::File);
+
+ std::shared_ptr<io::InputStream> stream;
+ ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(info));
+
+ ASSERT_OK_AND_ASSIGN(auto buffer, stream->Read(1024));
+ EXPECT_EQ(buffer->ToString(), kLoremIpsum);
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamEmpty) {
+ const auto path_to_file = "empty-object.txt";
+ const auto path = PreexistingContainerPath() + path_to_file;
+ service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlockBlobClient(path_to_file)
+ .UploadFrom(nullptr, 0);
+
+ ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(path));
+ std::array<char, 1024> buffer{};
+ std::int64_t size;
+ ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data()));
+ EXPECT_EQ(size, 0);
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamNotFound) {
+ ASSERT_RAISES(IOError, fs_->OpenInputStream(NotFoundObjectPath()));
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamInfoInvalid) {
+ // TODO(GH-38335): When implemented use ASSERT_OK_AND_ASSIGN(info,
+ // fs->GetFileInfo(PreexistingBucketPath()));
+ arrow::fs::FileInfo info(PreexistingContainerPath(), FileType::Directory);
+ ASSERT_RAISES(IOError, fs_->OpenInputStream(info));
+
+ // TODO(GH-38335): When implemented use ASSERT_OK_AND_ASSIGN(info,
+ // fs->GetFileInfo(NotFoundObjectPath()));
+ arrow::fs::FileInfo info2(PreexistingContainerPath(), FileType::NotFound);
+ ASSERT_RAISES(IOError, fs_->OpenInputStream(info2));
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamUri) {
+ ASSERT_RAISES(Invalid, fs_->OpenInputStream("abfss://" +
PreexistingObjectPath()));
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamTrailingSlash) {
+ ASSERT_RAISES(IOError, fs_->OpenInputStream(PreexistingObjectPath() + '/'));
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamReadMetadata) {
+ const std::string object_name = "OpenInputStreamMetadataTest/simple.txt";
+
+ service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlobClient(PreexistingObjectName())
+ .SetMetadata(Azure::Storage::Metadata{{"key0", "value0"}});
+
+ std::shared_ptr<io::InputStream> stream;
+ ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath()));
+
+ std::shared_ptr<const KeyValueMetadata> actual;
+ ASSERT_OK_AND_ASSIGN(actual, stream->ReadMetadata());
+ // TODO(GH-38330): This is asserting that the user defined metadata is
returned but this
+ // is probably not the correct behaviour.
+ ASSERT_OK_AND_EQ("value0", actual->Get("key0"));
+}
+
+TEST_F(TestAzureFileSystem, OpenInputStreamClosed) {
+ ASSERT_OK_AND_ASSIGN(auto stream,
fs_->OpenInputStream(PreexistingObjectPath()));
+ ASSERT_OK(stream->Close());
+ std::array<char, 16> buffer{};
+ ASSERT_RAISES(Invalid, stream->Read(buffer.size(), buffer.data()));
+ ASSERT_RAISES(Invalid, stream->Read(buffer.size()));
+ ASSERT_RAISES(Invalid, stream->Tell());
+}
+
+TEST_F(TestAzureFileSystem, OpenInputFileMixedReadVsReadAt) {
+ // Create a file large enough to make the random access tests non-trivial.
+ auto constexpr kLineWidth = 100;
+ auto constexpr kLineCount = 4096;
+ std::vector<std::string> lines(kLineCount);
+ int lineno = 0;
+ std::generate_n(lines.begin(), lines.size(),
+ [&] { return RandomLine(++lineno, kLineWidth); });
+
+ const auto path_to_file = "OpenInputFileMixedReadVsReadAt/object-name";
+ const auto path = PreexistingContainerPath() + path_to_file;
+
+ UploadLines(lines, path_to_file, kLineCount * kLineWidth);
+
+ std::shared_ptr<io::RandomAccessFile> file;
+ ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(path));
+ for (int i = 0; i != 32; ++i) {
+ SCOPED_TRACE("Iteration " + std::to_string(i));
+ // Verify sequential reads work as expected.
+ std::array<char, kLineWidth> buffer{};
+ std::int64_t size;
+ {
+ ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth));
+ EXPECT_EQ(lines[2 * i], actual->ToString());
+ }
+ {
+ ASSERT_OK_AND_ASSIGN(size, file->Read(buffer.size(), buffer.data()));
+ EXPECT_EQ(size, kLineWidth);
+ auto actual = std::string{buffer.begin(), buffer.end()};
+ EXPECT_EQ(lines[2 * i + 1], actual);
+ }
+
+ // Verify random reads interleave too.
+ auto const index = RandomIndex(kLineCount);
+ auto const position = index * kLineWidth;
+ ASSERT_OK_AND_ASSIGN(size, file->ReadAt(position, buffer.size(),
buffer.data()));
+ EXPECT_EQ(size, kLineWidth);
+ auto actual = std::string{buffer.begin(), buffer.end()};
+ EXPECT_EQ(lines[index], actual);
+
+ // Verify random reads using buffers work.
+ ASSERT_OK_AND_ASSIGN(auto b, file->ReadAt(position, kLineWidth));
+ EXPECT_EQ(lines[index], b->ToString());
+ }
+}
+
+TEST_F(TestAzureFileSystem, OpenInputFileRandomSeek) {
+ // Create a file large enough to make the random access tests non-trivial.
+ auto constexpr kLineWidth = 100;
+ auto constexpr kLineCount = 4096;
+ std::vector<std::string> lines(kLineCount);
+ int lineno = 0;
+ std::generate_n(lines.begin(), lines.size(),
+ [&] { return RandomLine(++lineno, kLineWidth); });
+
+ const auto path_to_file = "OpenInputFileRandomSeek/object-name";
+ const auto path = PreexistingContainerPath() + path_to_file;
+ std::shared_ptr<io::OutputStream> output;
+
+ UploadLines(lines, path_to_file, kLineCount * kLineWidth);
+
+ std::shared_ptr<io::RandomAccessFile> file;
+ ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(path));
+ for (int i = 0; i != 32; ++i) {
+ SCOPED_TRACE("Iteration " + std::to_string(i));
+ // Verify sequential reads work as expected.
+ auto const index = RandomIndex(kLineCount);
+ auto const position = index * kLineWidth;
+ ASSERT_OK(file->Seek(position));
+ ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth));
+ EXPECT_EQ(lines[index], actual->ToString());
+ }
+}
+
+TEST_F(TestAzureFileSystem, OpenInputFileIoContext) {
+ // Create a test file.
+ const auto path_to_file = "OpenInputFileIoContext/object-name";
+ const auto path = PreexistingContainerPath() + path_to_file;
+ const std::string contents = "The quick brown fox jumps over the lazy dog";
+
+ auto blob_client =
service_client_->GetBlobContainerClient(PreexistingContainerName())
+ .GetBlockBlobClient(path_to_file);
+ blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(contents.data()),
+ contents.length());
+
+ std::shared_ptr<io::RandomAccessFile> file;
+ ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(path));
+ EXPECT_EQ(fs_->io_context().external_id(), file->io_context().external_id());
+}
+
+TEST_F(TestAzureFileSystem, OpenInputFileInfo) {
+ // TODO(GH-38335): When implemented use ASSERT_OK_AND_ASSIGN(info,
+ // fs->GetFileInfo(PreexistingObjectPath()));
+ arrow::fs::FileInfo info(PreexistingObjectPath(), FileType::File);
+
+ std::shared_ptr<io::RandomAccessFile> file;
+ ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(info));
+
+ std::array<char, 1024> buffer{};
+ std::int64_t size;
+ auto constexpr kStart = 16;
+ ASSERT_OK_AND_ASSIGN(size, file->ReadAt(kStart, buffer.size(),
buffer.data()));
+
+ auto const expected = std::string(kLoremIpsum).substr(kStart);
+ EXPECT_EQ(std::string(buffer.data(), size), expected);
+}
+
+TEST_F(TestAzureFileSystem, OpenInputFileNotFound) {
+ ASSERT_RAISES(IOError, fs_->OpenInputFile(NotFoundObjectPath()));
+}
+
+TEST_F(TestAzureFileSystem, OpenInputFileInfoInvalid) {
+ // TODO(GH-38335): When implemented use ASSERT_OK_AND_ASSIGN(info,
+ // fs->GetFileInfo(PreexistingContainerPath()));
+ arrow::fs::FileInfo info(PreexistingContainerPath(), FileType::File);
+ ASSERT_RAISES(IOError, fs_->OpenInputFile(info));
+
+ // TODO(GH-38335): When implemented use ASSERT_OK_AND_ASSIGN(info,
+ // fs->GetFileInfo(NotFoundObjectPath()));
+ arrow::fs::FileInfo info2(NotFoundObjectPath(), FileType::NotFound);
+ ASSERT_RAISES(IOError, fs_->OpenInputFile(info2));
+}
+
+TEST_F(TestAzureFileSystem, OpenInputFileClosed) {
+ ASSERT_OK_AND_ASSIGN(auto stream,
fs_->OpenInputFile(PreexistingObjectPath()));
+ ASSERT_OK(stream->Close());
+ std::array<char, 16> buffer{};
+ ASSERT_RAISES(Invalid, stream->Tell());
+ ASSERT_RAISES(Invalid, stream->Read(buffer.size(), buffer.data()));
+ ASSERT_RAISES(Invalid, stream->Read(buffer.size()));
+ ASSERT_RAISES(Invalid, stream->ReadAt(1, buffer.size(), buffer.data()));
+ ASSERT_RAISES(Invalid, stream->ReadAt(1, 1));
+ ASSERT_RAISES(Invalid, stream->Seek(2));
+}
+
} // namespace
} // namespace fs
} // namespace arrow