Tom-Newton commented on code in PR #38269: URL: https://github.com/apache/arrow/pull/38269#discussion_r1364500518
########## cpp/src/arrow/filesystem/azurefs_test.cc: ########## @@ -154,6 +137,330 @@ TEST(AzureFileSystem, OptionsCompare) { EXPECT_TRUE(options.Equals(options)); } +class TestAzureFileSystem : public ::testing::Test { + public: + std::shared_ptr<FileSystem> fs_; + std::shared_ptr<Azure::Storage::Blobs::BlobServiceClient> service_client_; + std::mt19937_64 generator_; + std::string container_name_; + + TestAzureFileSystem() : generator_(std::random_device()()) {} + + AzureOptions MakeOptions() { + const std::string& account_name = GetAzuriteEnv()->account_name(); + const std::string& account_key = GetAzuriteEnv()->account_key(); + AzureOptions options; + options.backend = AzureBackend::Azurite; + ARROW_EXPECT_OK(options.ConfigureAccountKeyCredentials(account_name, account_key)); + return options; + } + + void SetUp() override { + ASSERT_THAT(GetAzuriteEnv(), NotNull()); + ASSERT_OK(GetAzuriteEnv()->status()); + + container_name_ = RandomChars(32); + auto options = MakeOptions(); + service_client_ = std::make_shared<Azure::Storage::Blobs::BlobServiceClient>( + options.account_blob_url, options.storage_credentials_provider); + ASSERT_OK_AND_ASSIGN(fs_, AzureFileSystem::Make(options)); + auto container_client = service_client_->GetBlobContainerClient(container_name_); + container_client.CreateIfNotExists(); + + auto blob_client = container_client.GetBlockBlobClient(PreexistingObjectName()); + blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(kLoremIpsum), + strlen(kLoremIpsum)); + } + + void TearDown() override { + auto containers = service_client_->ListBlobContainers(); + for (auto container : containers.BlobContainers) { + auto container_client = service_client_->GetBlobContainerClient(container.Name); + container_client.DeleteIfExists(); + } + } + + std::string PreexistingContainerName() const { return container_name_; } + + std::string PreexistingContainerPath() const { + return PreexistingContainerName() + '/'; + } + + static std::string PreexistingObjectName() { return "test-object-name"; } + + std::string PreexistingObjectPath() const { + return PreexistingContainerPath() + PreexistingObjectName(); + } + + std::string NotFoundObjectPath() { return PreexistingContainerPath() + "not-found"; } + + std::string RandomLine(int lineno, std::size_t width) { + auto line = std::to_string(lineno) + ": "; + line += RandomChars(width - line.size() - 1); + line += '\n'; + return line; + } + + std::size_t RandomIndex(std::size_t end) { + return std::uniform_int_distribution<std::size_t>(0, end - 1)(generator_); + } + + std::string RandomChars(std::size_t count) { + auto const fillers = std::string("abcdefghijlkmnopqrstuvwxyz0123456789"); + std::uniform_int_distribution<std::size_t> d(0, fillers.size() - 1); + std::string s; + std::generate_n(std::back_inserter(s), count, [&] { return fillers[d(generator_)]; }); + return s; + } + + void UploadLines(const std::vector<std::string>& lines, const char* path_to_file, + int total_size) { + // TODO: Switch to using Azure filesystem to write once its implemented. + auto blob_client = service_client_->GetBlobContainerClient(PreexistingContainerName()) + .GetBlockBlobClient(path_to_file); + std::string all_lines = std::accumulate(lines.begin(), lines.end(), std::string("")); + blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(all_lines.data()), + total_size); + } +}; + +TEST_F(TestAzureFileSystem, OpenInputStreamString) { + std::shared_ptr<io::InputStream> stream; + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath())); + + std::array<char, 1024> buffer{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data())); + + EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamStringBuffers) { + std::shared_ptr<io::InputStream> stream; + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath())); + + std::string contents; + std::shared_ptr<Buffer> buffer; + do { + ASSERT_OK_AND_ASSIGN(buffer, stream->Read(16)); + contents.append(buffer->ToString()); + } while (buffer && buffer->size() != 0); + + EXPECT_EQ(contents, kLoremIpsum); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamInfo) { + // TODO: When implemented use ASSERT_OK_AND_ASSIGN(info, + // fs->GetFileInfo(PreexistingObjectPath())); + arrow::fs::FileInfo info(PreexistingObjectPath(), FileType::File); + + std::shared_ptr<io::InputStream> stream; + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(info)); + + std::array<char, 1024> buffer{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data())); + + EXPECT_EQ(std::string(buffer.data(), size), kLoremIpsum); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamEmpty) { + const auto path_to_file = "empty-object.txt"; + const auto path = PreexistingContainerPath() + path_to_file; + service_client_->GetBlobContainerClient(PreexistingContainerName()) + .GetBlockBlobClient(path_to_file) + .UploadFrom(nullptr, 0); + + ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(path)); + std::array<char, 1024> buffer{}; + std::int64_t size; + ASSERT_OK_AND_ASSIGN(size, stream->Read(buffer.size(), buffer.data())); + EXPECT_EQ(size, 0); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamNotFound) { + ASSERT_RAISES(IOError, fs_->OpenInputStream(NotFoundObjectPath())); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamInfoInvalid) { + // TODO: When implemented use ASSERT_OK_AND_ASSIGN(info, + // fs->GetFileInfo(PreexistingBucketPath())); + arrow::fs::FileInfo info(PreexistingContainerPath(), FileType::Directory); + ASSERT_RAISES(IOError, fs_->OpenInputStream(info)); + + // TODO: When implemented use ASSERT_OK_AND_ASSIGN(info, + // fs->GetFileInfo(NotFoundObjectPath())); + arrow::fs::FileInfo info2(PreexistingContainerPath(), FileType::NotFound); + ASSERT_RAISES(IOError, fs_->OpenInputStream(info2)); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamUri) { + ASSERT_RAISES(Invalid, fs_->OpenInputStream("abfss://" + PreexistingObjectPath())); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamTrailingSlash) { + ASSERT_RAISES(IOError, fs_->OpenInputStream(PreexistingObjectPath() + '/')); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamReadMetadata) { + const std::string object_name = "OpenInputStreamMetadataTest/simple.txt"; + + service_client_->GetBlobContainerClient(PreexistingContainerName()) + .GetBlobClient(PreexistingObjectName()) + .SetMetadata(Azure::Storage::Metadata{{"key0", "value0"}}); + + std::shared_ptr<io::InputStream> stream; + ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream(PreexistingObjectPath())); + + std::shared_ptr<const KeyValueMetadata> actual; + ASSERT_OK_AND_ASSIGN(actual, stream->ReadMetadata()); + ASSERT_OK_AND_EQ("value0", actual->Get("key0")); +} + +TEST_F(TestAzureFileSystem, OpenInputStreamClosed) { + ASSERT_OK_AND_ASSIGN(auto stream, fs_->OpenInputStream(PreexistingObjectPath())); + ASSERT_OK(stream->Close()); + std::array<char, 16> buffer{}; + ASSERT_RAISES(Invalid, stream->Read(buffer.size(), buffer.data())); + ASSERT_RAISES(Invalid, stream->Read(buffer.size())); + ASSERT_RAISES(Invalid, stream->Tell()); +} + +TEST_F(TestAzureFileSystem, OpenInputFileMixedReadVsReadAt) { + // Create a file large enough to make the random access tests non-trivial. + auto constexpr kLineWidth = 100; + auto constexpr kLineCount = 4096; + std::vector<std::string> lines(kLineCount); + int lineno = 0; + std::generate_n(lines.begin(), lines.size(), + [&] { return RandomLine(++lineno, kLineWidth); }); + + const auto path_to_file = "OpenInputFileMixedReadVsReadAt/object-name"; + const auto path = PreexistingContainerPath() + path_to_file; + + UploadLines(lines, path_to_file, kLineCount * kLineWidth); + + std::shared_ptr<io::RandomAccessFile> file; + ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(path)); + for (int i = 0; i != 32; ++i) { + SCOPED_TRACE("Iteration " + std::to_string(i)); + // Verify sequential reads work as expected. + std::array<char, kLineWidth> buffer{}; + std::int64_t size; + { + ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth)); + EXPECT_EQ(lines[2 * i], actual->ToString()); + } + { + ASSERT_OK_AND_ASSIGN(size, file->Read(buffer.size(), buffer.data())); + EXPECT_EQ(size, kLineWidth); + auto actual = std::string{buffer.begin(), buffer.end()}; + EXPECT_EQ(lines[2 * i + 1], actual); + } + + // Verify random reads interleave too. + auto const index = RandomIndex(kLineCount); + auto const position = index * kLineWidth; + ASSERT_OK_AND_ASSIGN(size, file->ReadAt(position, buffer.size(), buffer.data())); + EXPECT_EQ(size, kLineWidth); + auto actual = std::string{buffer.begin(), buffer.end()}; + EXPECT_EQ(lines[index], actual); + + // Verify random reads using buffers work. + ASSERT_OK_AND_ASSIGN(auto b, file->ReadAt(position, kLineWidth)); + EXPECT_EQ(lines[index], b->ToString()); + } +} + +TEST_F(TestAzureFileSystem, OpenInputFileRandomSeek) { + // Create a file large enough to make the random access tests non-trivial. + auto constexpr kLineWidth = 100; + auto constexpr kLineCount = 4096; + std::vector<std::string> lines(kLineCount); + int lineno = 0; + std::generate_n(lines.begin(), lines.size(), + [&] { return RandomLine(++lineno, kLineWidth); }); + + const auto path_to_file = "OpenInputFileRandomSeek/object-name"; + const auto path = PreexistingContainerPath() + path_to_file; + std::shared_ptr<io::OutputStream> output; + + UploadLines(lines, path_to_file, kLineCount * kLineWidth); + + std::shared_ptr<io::RandomAccessFile> file; + ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(path)); + for (int i = 0; i != 32; ++i) { + SCOPED_TRACE("Iteration " + std::to_string(i)); + // Verify sequential reads work as expected. + auto const index = RandomIndex(kLineCount); + auto const position = index * kLineWidth; + ASSERT_OK(file->Seek(position)); + ASSERT_OK_AND_ASSIGN(auto actual, file->Read(kLineWidth)); + EXPECT_EQ(lines[index], actual->ToString()); + } +} + +TEST_F(TestAzureFileSystem, OpenInputFileIoContext) { + // Create a test file. + const auto path_to_file = "OpenInputFileIoContext/object-name"; + const auto path = PreexistingContainerPath() + path_to_file; + const std::string contents = "The quick brown fox jumps over the lazy dog"; + + auto blob_client = service_client_->GetBlobContainerClient(PreexistingContainerName()) + .GetBlockBlobClient(path_to_file); + blob_client.UploadFrom(reinterpret_cast<const uint8_t*>(contents.data()), + contents.length()); + + std::shared_ptr<io::RandomAccessFile> file; + ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(path)); + EXPECT_EQ(fs_->io_context().external_id(), file->io_context().external_id()); +} + +TEST_F(TestAzureFileSystem, OpenInputFileInfo) { + // TODO: When implemented use ASSERT_OK_AND_ASSIGN(info, + // fs->GetFileInfo(PreexistingObjectPath())); + arrow::fs::FileInfo info(PreexistingObjectPath(), FileType::File); + + std::shared_ptr<io::RandomAccessFile> file; + ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile(info)); + + std::array<char, 1024> buffer{}; + std::int64_t size; + auto constexpr kStart = 16; + ASSERT_OK_AND_ASSIGN(size, file->ReadAt(kStart, buffer.size(), buffer.data())); + + auto const expected = std::string(kLoremIpsum).substr(kStart); + EXPECT_EQ(std::string(buffer.data(), size), expected); +} + +TEST_F(TestAzureFileSystem, OpenInputFileNotFound) { + ASSERT_RAISES(IOError, fs_->OpenInputFile(NotFoundObjectPath())); +} + +TEST_F(TestAzureFileSystem, OpenInputFileInfoInvalid) { + // TODO: When implemented use ASSERT_OK_AND_ASSIGN(info, + // fs->GetFileInfo(PreexistingContainerPath())); Review Comment: Actually it might require a check to determine if the storage account has hierarchical namespace enabled, at which point it could get quite complicated, but that is for another PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org