pitrou commented on a change in pull request #10955: URL: https://github.com/apache/arrow/pull/10955#discussion_r718348758
########## File path: cpp/src/arrow/filesystem/test_util.cc ########## @@ -167,6 +165,27 @@ void AssertFileInfo(FileSystem* fs, const std::string& path, FileType type, AssertFileInfo(info, path, type, size); } +GatedMockFilesystem::GatedMockFilesystem(TimePoint current_time, + const io::IOContext& io_context) + : internal::MockFileSystem(current_time, io_context) {} +GatedMockFilesystem::~GatedMockFilesystem() = default; + +Result<std::shared_ptr<io::OutputStream>> GatedMockFilesystem::OpenOutputStream( + const std::string& path, const std::shared_ptr<const KeyValueMetadata>& metadata) { + RETURN_NOT_OK(open_output_sem_.Acquire(1)); + return MockFileSystem::OpenOutputStream(path, metadata); +} + +// Wait until at least num_waiters are waiting on OpenOutputStream Review comment: These descriptions should be put in the `.h`. ########## File path: cpp/src/arrow/dataset/dataset_writer.h ########## @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <string> + +#include "arrow/dataset/file_base.h" +#include "arrow/record_batch.h" +#include "arrow/status.h" +#include "arrow/util/async_util.h" +#include "arrow/util/future.h" + +namespace arrow { +namespace dataset { + +constexpr uint64_t kDefaultDatasetWriterMaxRowsQueued = 64 * 1024 * 1024; + +/// \brief Utility class that manages a set of writers to different paths +/// +/// Writers may be closed and reopened (and a new file created) based on the dataset +/// write options (for example, max_rows_per_file or max_open_files) +/// +/// The dataset writer enforces its own back pressure based on the # of rows (as opposed +/// to # of batches which is how it is typically enforced elsewhere) and # of files. +class ARROW_DS_EXPORT DatasetWriter { + public: + /// \brief Creates a dataset writer + /// + /// Will fail if basename_template is invalid or if there is existing data and + /// existing_data_behavior is kError + /// + /// \param write_options options to control how the data should be written + /// \param max_rows_queued max # of rows allowed to be queued before the dataset_writer + /// will ask for backpressure + static Result<std::unique_ptr<DatasetWriter>> Make( + FileSystemDatasetWriteOptions write_options, + uint64_t max_rows_queued = kDefaultDatasetWriterMaxRowsQueued); + + ~DatasetWriter(); + + /// \brief Writes a batch to the dataset + /// \param[in] batch The batch to write + /// \param[in] directory The directory to write to + /// + /// Note: The written filename will be {directory}/{filename_factory(i)} where i is a + /// counter controlled by `max_open_files` and `max_rows_per_file` + /// + /// If multiple WriteRecordBatch calls arrive with the same `directory` then the batches + /// may be written to the same file. + /// + /// The returned future will be marked finished when the record batch has been queued + /// to be written. If the returned future is unfinished then this indicates the dataset + /// writer's queue is full and the data provider should pause. + /// + /// This method is NOT async reentrant. The returned future will only be incomplete Review comment: What does "incomplete" mean? ########## File path: cpp/src/arrow/dataset/dataset_writer.h ########## @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <string> + +#include "arrow/dataset/file_base.h" +#include "arrow/record_batch.h" +#include "arrow/status.h" +#include "arrow/util/async_util.h" +#include "arrow/util/future.h" + +namespace arrow { +namespace dataset { + +constexpr uint64_t kDefaultDatasetWriterMaxRowsQueued = 64 * 1024 * 1024; + +/// \brief Utility class that manages a set of writers to different paths +/// +/// Writers may be closed and reopened (and a new file created) based on the dataset +/// write options (for example, max_rows_per_file or max_open_files) +/// +/// The dataset writer enforces its own back pressure based on the # of rows (as opposed +/// to # of batches which is how it is typically enforced elsewhere) and # of files. +class ARROW_DS_EXPORT DatasetWriter { + public: + /// \brief Creates a dataset writer Review comment: "Create" if we follow the convention of using infinitives. ########## File path: cpp/src/arrow/dataset/dataset_writer_test.cc ########## @@ -0,0 +1,344 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/dataset/dataset_writer.h" + +#include <chrono> +#include <mutex> +#include <vector> + +#include "arrow/dataset/file_ipc.h" +#include "arrow/filesystem/mockfs.h" +#include "arrow/filesystem/test_util.h" +#include "arrow/io/memory.h" +#include "arrow/ipc/reader.h" +#include "arrow/table.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/optional.h" +#include "gtest/gtest.h" + +namespace arrow { +namespace dataset { + +using arrow::fs::internal::MockFileInfo; +using arrow::fs::internal::MockFileSystem; + +struct ExpectedFile { + std::string filename; + uint64_t start; + uint64_t num_rows; +}; + +class DatasetWriterTestFixture : public testing::Test { + protected: + void SetUp() override { + fs::TimePoint mock_now = std::chrono::system_clock::now(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<fs::FileSystem> fs, + MockFileSystem::Make(mock_now, {::arrow::fs::Dir("testdir")})); + filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs); + schema_ = schema({field("int64", int64())}); + write_options_.filesystem = filesystem_; + write_options_.basename_template = "part-{i}.arrow"; + write_options_.base_dir = "testdir"; + write_options_.writer_pre_finish = [this](FileWriter* writer) { + pre_finish_visited_.push_back(writer->destination().path); + return Status::OK(); + }; + write_options_.writer_post_finish = [this](FileWriter* writer) { + post_finish_visited_.push_back(writer->destination().path); + return Status::OK(); + }; + std::shared_ptr<FileFormat> format = std::make_shared<IpcFileFormat>(); + write_options_.file_write_options = format->DefaultWriteOptions(); + } + + std::shared_ptr<fs::GatedMockFilesystem> UseGatedFs() { + fs::TimePoint mock_now = std::chrono::system_clock::now(); + auto fs = std::make_shared<fs::GatedMockFilesystem>(mock_now); + ARROW_EXPECT_OK(fs->CreateDir("testdir")); + write_options_.filesystem = fs; + filesystem_ = fs; + return fs; + } + + std::shared_ptr<RecordBatch> MakeBatch(uint64_t start, uint64_t num_rows) { + Int64Builder builder; + for (uint64_t i = 0; i < num_rows; i++) { + ARROW_EXPECT_OK(builder.Append(i + start)); + } + EXPECT_OK_AND_ASSIGN(std::shared_ptr<Array> arr, builder.Finish()); + return RecordBatch::Make(schema_, static_cast<int64_t>(num_rows), {std::move(arr)}); + } + + std::shared_ptr<RecordBatch> MakeBatch(uint64_t num_rows) { + std::shared_ptr<RecordBatch> batch = MakeBatch(counter_, num_rows); + counter_ += num_rows; + return batch; + } + + util::optional<MockFileInfo> FindFile(const std::string& filename) { + for (const auto& mock_file : filesystem_->AllFiles()) { + if (mock_file.full_path == filename) { + return mock_file; + } + } + return util::nullopt; + } + + void AssertVisited(const std::vector<std::string>& actual_paths, + const std::string& expected_path) { + std::vector<std::string>::const_iterator found = Review comment: Can use `const auto` for conciseness. ########## File path: cpp/src/arrow/filesystem/test_util.h ########## @@ -38,6 +40,23 @@ static inline FileInfo Dir(std::string path) { return FileInfo(std::move(path), FileType::Directory); } +class ARROW_TESTING_EXPORT GatedMockFilesystem : public internal::MockFileSystem { Review comment: Add a comment saying what this does? ########## File path: r/R/dataset-write.R ########## @@ -111,7 +111,7 @@ write_dataset <- function(dataset, dataset <- dplyr::ungroup(dataset) } - scanner <- Scanner$create(dataset) + scanner <- Scanner$create(dataset, use_async=TRUE) Review comment: Can you fix the R warning below? ########## File path: cpp/src/arrow/dataset/dataset_writer_test.cc ########## @@ -0,0 +1,344 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/dataset/dataset_writer.h" + +#include <chrono> +#include <mutex> +#include <vector> + +#include "arrow/dataset/file_ipc.h" +#include "arrow/filesystem/mockfs.h" +#include "arrow/filesystem/test_util.h" +#include "arrow/io/memory.h" +#include "arrow/ipc/reader.h" +#include "arrow/table.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/optional.h" +#include "gtest/gtest.h" + +namespace arrow { +namespace dataset { + +using arrow::fs::internal::MockFileInfo; +using arrow::fs::internal::MockFileSystem; + +struct ExpectedFile { Review comment: Put this inside the fixture below? ########## File path: cpp/src/arrow/util/counting_semaphore.h ########## @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_COUNTING_SEMAPHORE_H +#define ARROW_COUNTING_SEMAPHORE_H + +#include <memory> + +#include "arrow/status.h" + +namespace arrow { +namespace util { + +/// \brief Simple mutex-based counting semaphore with timeout +class ARROW_EXPORT CountingSemaphore { + public: + /// \brief Create an instance with initial_avail starting permits + /// + /// \param[in] initial_avail The semaphore will start with this many permits available + /// \param[in] timeout_seconds A timeout to be applied to all operations. Operations + /// will return Status::Invalid if this timeout elapses + explicit CountingSemaphore(uint32_t initial_avail = 0, double timeout_seconds = 10); + ~CountingSemaphore(); + /// \brief Block until num_permits permits are available + Status Acquire(uint32_t num_permits); + /// \brief Make num_permits permits available + Status Release(uint32_t num_permits); + /// \brief Waits until num_waiters are waiting on permits Review comment: "Wait" ########## File path: cpp/src/arrow/testing/gtest_util.h ########## @@ -94,7 +94,7 @@ #define ARROW_EXPECT_OK(expr) \ do { \ - auto _res = (expr); \ + const auto& _res = (expr); \ Review comment: Are we sure it is ok to take a const-ref here? Presumably `(expr)` can return a rvalue. ########## File path: cpp/src/arrow/util/counting_semaphore.h ########## @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_COUNTING_SEMAPHORE_H +#define ARROW_COUNTING_SEMAPHORE_H + +#include <memory> + +#include "arrow/status.h" + +namespace arrow { +namespace util { + +/// \brief Simple mutex-based counting semaphore with timeout +class ARROW_EXPORT CountingSemaphore { + public: + /// \brief Create an instance with initial_avail starting permits + /// + /// \param[in] initial_avail The semaphore will start with this many permits available + /// \param[in] timeout_seconds A timeout to be applied to all operations. Operations + /// will return Status::Invalid if this timeout elapses + explicit CountingSemaphore(uint32_t initial_avail = 0, double timeout_seconds = 10); + ~CountingSemaphore(); + /// \brief Block until num_permits permits are available + Status Acquire(uint32_t num_permits); + /// \brief Make num_permits permits available + Status Release(uint32_t num_permits); + /// \brief Waits until num_waiters are waiting on permits Review comment: Basically, this is using the semaphore as a barrier? ########## File path: cpp/src/arrow/dataset/dataset_writer_test.cc ########## @@ -0,0 +1,344 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/dataset/dataset_writer.h" + +#include <chrono> +#include <mutex> +#include <vector> + +#include "arrow/dataset/file_ipc.h" +#include "arrow/filesystem/mockfs.h" +#include "arrow/filesystem/test_util.h" +#include "arrow/io/memory.h" +#include "arrow/ipc/reader.h" +#include "arrow/table.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/optional.h" +#include "gtest/gtest.h" + +namespace arrow { +namespace dataset { + +using arrow::fs::internal::MockFileInfo; +using arrow::fs::internal::MockFileSystem; + +struct ExpectedFile { + std::string filename; + uint64_t start; + uint64_t num_rows; +}; + +class DatasetWriterTestFixture : public testing::Test { + protected: + void SetUp() override { + fs::TimePoint mock_now = std::chrono::system_clock::now(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<fs::FileSystem> fs, + MockFileSystem::Make(mock_now, {::arrow::fs::Dir("testdir")})); + filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs); + schema_ = schema({field("int64", int64())}); + write_options_.filesystem = filesystem_; + write_options_.basename_template = "part-{i}.arrow"; + write_options_.base_dir = "testdir"; + write_options_.writer_pre_finish = [this](FileWriter* writer) { + pre_finish_visited_.push_back(writer->destination().path); + return Status::OK(); + }; + write_options_.writer_post_finish = [this](FileWriter* writer) { + post_finish_visited_.push_back(writer->destination().path); + return Status::OK(); + }; + std::shared_ptr<FileFormat> format = std::make_shared<IpcFileFormat>(); + write_options_.file_write_options = format->DefaultWriteOptions(); + } + + std::shared_ptr<fs::GatedMockFilesystem> UseGatedFs() { + fs::TimePoint mock_now = std::chrono::system_clock::now(); + auto fs = std::make_shared<fs::GatedMockFilesystem>(mock_now); + ARROW_EXPECT_OK(fs->CreateDir("testdir")); + write_options_.filesystem = fs; + filesystem_ = fs; + return fs; + } + + std::shared_ptr<RecordBatch> MakeBatch(uint64_t start, uint64_t num_rows) { + Int64Builder builder; + for (uint64_t i = 0; i < num_rows; i++) { + ARROW_EXPECT_OK(builder.Append(i + start)); + } + EXPECT_OK_AND_ASSIGN(std::shared_ptr<Array> arr, builder.Finish()); + return RecordBatch::Make(schema_, static_cast<int64_t>(num_rows), {std::move(arr)}); + } + + std::shared_ptr<RecordBatch> MakeBatch(uint64_t num_rows) { + std::shared_ptr<RecordBatch> batch = MakeBatch(counter_, num_rows); + counter_ += num_rows; + return batch; + } + + util::optional<MockFileInfo> FindFile(const std::string& filename) { + for (const auto& mock_file : filesystem_->AllFiles()) { + if (mock_file.full_path == filename) { + return mock_file; + } + } + return util::nullopt; + } + + void AssertVisited(const std::vector<std::string>& actual_paths, + const std::string& expected_path) { + std::vector<std::string>::const_iterator found = + std::find(actual_paths.begin(), actual_paths.end(), expected_path); + ASSERT_NE(found, actual_paths.end()) + << "The file " << expected_path << " was not in the list of files visited"; + } + + std::shared_ptr<RecordBatch> ReadAsBatch(util::string_view data) { + std::shared_ptr<io::RandomAccessFile> in_stream = + std::make_shared<io::BufferReader>(data); + EXPECT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> reader, + ipc::RecordBatchFileReader::Open(std::move(in_stream))); + RecordBatchVector batches; + for (int i = 0; i < reader->num_record_batches(); i++) { + EXPECT_OK_AND_ASSIGN(std::shared_ptr<RecordBatch> next_batch, + reader->ReadRecordBatch(i)); + batches.push_back(next_batch); + } + EXPECT_OK_AND_ASSIGN(std::shared_ptr<Table> table, Table::FromRecordBatches(batches)); + EXPECT_OK_AND_ASSIGN(std::shared_ptr<Table> combined_table, table->CombineChunks()); + EXPECT_OK_AND_ASSIGN(std::shared_ptr<RecordBatch> batch, + TableBatchReader(*combined_table).Next()); + return batch; + } + + void AssertFileCreated(const util::optional<MockFileInfo>& maybe_file, + const std::string& expected_filename) { + ASSERT_TRUE(maybe_file.has_value()) + << "The file " << expected_filename << " was not created"; + { + SCOPED_TRACE("pre_finish"); + AssertVisited(pre_finish_visited_, expected_filename); + } + { + SCOPED_TRACE("post_finish"); + AssertVisited(post_finish_visited_, expected_filename); + } + } + + void AssertCreatedData(const std::vector<ExpectedFile>& expected_files) { + counter_ = 0; + for (const auto& expected_file : expected_files) { + util::optional<MockFileInfo> written_file = FindFile(expected_file.filename); + AssertFileCreated(written_file, expected_file.filename); + AssertBatchesEqual(*MakeBatch(expected_file.start, expected_file.num_rows), + *ReadAsBatch(written_file->data)); + } + } + + void AssertFilesCreated(const std::vector<std::string>& expected_files) { + for (const std::string& expected_file : expected_files) { + util::optional<MockFileInfo> written_file = FindFile(expected_file); + AssertFileCreated(written_file, expected_file); + } + } + + void AssertNotFiles(const std::vector<std::string>& expected_non_files) { + for (const auto& expected_non_file : expected_non_files) { + util::optional<MockFileInfo> file = FindFile(expected_non_file); + ASSERT_FALSE(file.has_value()); + } + } + + void AssertEmptyFiles(const std::vector<std::string>& expected_empty_files) { + for (const auto& expected_empty_file : expected_empty_files) { + util::optional<MockFileInfo> file = FindFile(expected_empty_file); + ASSERT_TRUE(file.has_value()); + ASSERT_EQ("", file->data); + } + } + + std::shared_ptr<MockFileSystem> filesystem_; + std::shared_ptr<Schema> schema_; + std::vector<std::string> pre_finish_visited_; + std::vector<std::string> post_finish_visited_; + FileSystemDatasetWriteOptions write_options_; + uint64_t counter_ = 0; +}; + +TEST_F(DatasetWriterTestFixture, Basic) { + EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), ""); + AssertFinished(queue_fut); + ASSERT_FINISHES_OK(dataset_writer->Finish()); + AssertCreatedData({{"testdir/part-0.arrow", 0, 100}}); +} + +TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) { + write_options_.max_rows_per_file = 10; + EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(35), ""); + AssertFinished(queue_fut); + ASSERT_FINISHES_OK(dataset_writer->Finish()); + AssertCreatedData({{"testdir/part-0.arrow", 0, 10}, + {"testdir/part-1.arrow", 10, 10}, + {"testdir/part-2.arrow", 20, 10}, + {"testdir/part-3.arrow", 30, 5}}); +} + +TEST_F(DatasetWriterTestFixture, MaxRowsManyWrites) { + write_options_.max_rows_per_file = 10; + EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); + ASSERT_FINISHES_OK(dataset_writer->Finish()); + AssertCreatedData({{"testdir/part-0.arrow", 0, 10}, {"testdir/part-1.arrow", 10, 8}}); +} + +TEST_F(DatasetWriterTestFixture, ConcurrentWritesSameFile) { + auto gated_fs = UseGatedFs(); + EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + for (int i = 0; i < 10; i++) { + Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(10), ""); + AssertFinished(queue_fut); + ASSERT_FINISHES_OK(queue_fut); + } + ASSERT_OK(gated_fs->WaitForOpenOutputStream(1)); + ASSERT_OK(gated_fs->UnlockOpenOutputStream(1)); + ASSERT_FINISHES_OK(dataset_writer->Finish()); + AssertCreatedData({{"testdir/part-0.arrow", 0, 100}}); +} + +TEST_F(DatasetWriterTestFixture, ConcurrentWritesDifferentFiles) { + // NBATCHES must be less than I/O executor concurrency to avoid deadlock / test failure + constexpr int NBATCHES = 6; + auto gated_fs = UseGatedFs(); + std::vector<ExpectedFile> expected_files; + EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + for (int i = 0; i < NBATCHES; i++) { + std::string i_str = std::to_string(i); + expected_files.push_back(ExpectedFile{"testdir/part" + i_str + "/part-0.arrow", + static_cast<uint64_t>(i) * 10, 10}); + Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(10), "part" + i_str); + AssertFinished(queue_fut); + ASSERT_FINISHES_OK(queue_fut); + } + ASSERT_OK(gated_fs->WaitForOpenOutputStream(NBATCHES)); + ASSERT_OK(gated_fs->UnlockOpenOutputStream(NBATCHES)); + ASSERT_FINISHES_OK(dataset_writer->Finish()); + AssertCreatedData(expected_files); +} + +TEST_F(DatasetWriterTestFixture, MaxOpenFiles) { + auto gated_fs = UseGatedFs(); + write_options_.max_open_files = 2; + EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part0")); + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part1")); + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part0")); + Future<> fut = dataset_writer->WriteRecordBatch(MakeBatch(10), "part2"); + // Backpressure will be applied until an existing file can be evicted + AssertNotFinished(fut); + + // Ungate the writes to relieve the pressure, testdir/part0 should be closed + ASSERT_OK(gated_fs->WaitForOpenOutputStream(2)); + ASSERT_OK(gated_fs->UnlockOpenOutputStream(5)); + ASSERT_FINISHES_OK(fut); + + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part0")); + // Following call should resume existing write but, on slow test systems, the old + // write may have already been finished + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part1")); + ASSERT_FINISHES_OK(dataset_writer->Finish()); + AssertFilesCreated({"testdir/part0/part-0.arrow", "testdir/part0/part-1.arrow", + "testdir/part1/part-0.arrow", "testdir/part2/part-0.arrow"}); +} + +TEST_F(DatasetWriterTestFixture, DeleteExistingData) { + fs::TimePoint mock_now = std::chrono::system_clock::now(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<fs::FileSystem> fs, + MockFileSystem::Make(mock_now, {::arrow::fs::Dir("testdir"), + fs::File("testdir/part-5.arrow"), + fs::File("testdir/blah.txt")})); Review comment: Should you also add a subdir and file inside it? ########## File path: cpp/src/arrow/dataset/dataset_writer.h ########## @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <string> + +#include "arrow/dataset/file_base.h" +#include "arrow/record_batch.h" +#include "arrow/status.h" +#include "arrow/util/async_util.h" +#include "arrow/util/future.h" + +namespace arrow { +namespace dataset { + +constexpr uint64_t kDefaultDatasetWriterMaxRowsQueued = 64 * 1024 * 1024; + +/// \brief Utility class that manages a set of writers to different paths +/// +/// Writers may be closed and reopened (and a new file created) based on the dataset +/// write options (for example, max_rows_per_file or max_open_files) +/// +/// The dataset writer enforces its own back pressure based on the # of rows (as opposed +/// to # of batches which is how it is typically enforced elsewhere) and # of files. +class ARROW_DS_EXPORT DatasetWriter { Review comment: Is this for internal use? If so, perhaps say so in the docstring (and/or put it inside `arrow::dataset::internal`). ########## File path: cpp/src/arrow/dataset/dataset_writer_test.cc ########## @@ -0,0 +1,344 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/dataset/dataset_writer.h" + +#include <chrono> +#include <mutex> +#include <vector> + +#include "arrow/dataset/file_ipc.h" +#include "arrow/filesystem/mockfs.h" +#include "arrow/filesystem/test_util.h" +#include "arrow/io/memory.h" +#include "arrow/ipc/reader.h" +#include "arrow/table.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/optional.h" +#include "gtest/gtest.h" + +namespace arrow { +namespace dataset { + +using arrow::fs::internal::MockFileInfo; +using arrow::fs::internal::MockFileSystem; + +struct ExpectedFile { + std::string filename; + uint64_t start; + uint64_t num_rows; +}; + +class DatasetWriterTestFixture : public testing::Test { + protected: + void SetUp() override { + fs::TimePoint mock_now = std::chrono::system_clock::now(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<fs::FileSystem> fs, + MockFileSystem::Make(mock_now, {::arrow::fs::Dir("testdir")})); + filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs); + schema_ = schema({field("int64", int64())}); + write_options_.filesystem = filesystem_; + write_options_.basename_template = "part-{i}.arrow"; + write_options_.base_dir = "testdir"; + write_options_.writer_pre_finish = [this](FileWriter* writer) { + pre_finish_visited_.push_back(writer->destination().path); + return Status::OK(); + }; + write_options_.writer_post_finish = [this](FileWriter* writer) { + post_finish_visited_.push_back(writer->destination().path); + return Status::OK(); + }; + std::shared_ptr<FileFormat> format = std::make_shared<IpcFileFormat>(); + write_options_.file_write_options = format->DefaultWriteOptions(); + } + + std::shared_ptr<fs::GatedMockFilesystem> UseGatedFs() { + fs::TimePoint mock_now = std::chrono::system_clock::now(); + auto fs = std::make_shared<fs::GatedMockFilesystem>(mock_now); + ARROW_EXPECT_OK(fs->CreateDir("testdir")); + write_options_.filesystem = fs; + filesystem_ = fs; + return fs; + } + + std::shared_ptr<RecordBatch> MakeBatch(uint64_t start, uint64_t num_rows) { + Int64Builder builder; + for (uint64_t i = 0; i < num_rows; i++) { + ARROW_EXPECT_OK(builder.Append(i + start)); + } + EXPECT_OK_AND_ASSIGN(std::shared_ptr<Array> arr, builder.Finish()); + return RecordBatch::Make(schema_, static_cast<int64_t>(num_rows), {std::move(arr)}); + } + + std::shared_ptr<RecordBatch> MakeBatch(uint64_t num_rows) { + std::shared_ptr<RecordBatch> batch = MakeBatch(counter_, num_rows); + counter_ += num_rows; + return batch; + } + + util::optional<MockFileInfo> FindFile(const std::string& filename) { + for (const auto& mock_file : filesystem_->AllFiles()) { + if (mock_file.full_path == filename) { + return mock_file; + } + } + return util::nullopt; + } + + void AssertVisited(const std::vector<std::string>& actual_paths, + const std::string& expected_path) { + std::vector<std::string>::const_iterator found = + std::find(actual_paths.begin(), actual_paths.end(), expected_path); + ASSERT_NE(found, actual_paths.end()) + << "The file " << expected_path << " was not in the list of files visited"; + } + + std::shared_ptr<RecordBatch> ReadAsBatch(util::string_view data) { + std::shared_ptr<io::RandomAccessFile> in_stream = + std::make_shared<io::BufferReader>(data); + EXPECT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> reader, + ipc::RecordBatchFileReader::Open(std::move(in_stream))); + RecordBatchVector batches; + for (int i = 0; i < reader->num_record_batches(); i++) { + EXPECT_OK_AND_ASSIGN(std::shared_ptr<RecordBatch> next_batch, + reader->ReadRecordBatch(i)); + batches.push_back(next_batch); + } + EXPECT_OK_AND_ASSIGN(std::shared_ptr<Table> table, Table::FromRecordBatches(batches)); + EXPECT_OK_AND_ASSIGN(std::shared_ptr<Table> combined_table, table->CombineChunks()); + EXPECT_OK_AND_ASSIGN(std::shared_ptr<RecordBatch> batch, + TableBatchReader(*combined_table).Next()); + return batch; + } + + void AssertFileCreated(const util::optional<MockFileInfo>& maybe_file, + const std::string& expected_filename) { + ASSERT_TRUE(maybe_file.has_value()) + << "The file " << expected_filename << " was not created"; + { + SCOPED_TRACE("pre_finish"); + AssertVisited(pre_finish_visited_, expected_filename); + } + { + SCOPED_TRACE("post_finish"); + AssertVisited(post_finish_visited_, expected_filename); + } + } + + void AssertCreatedData(const std::vector<ExpectedFile>& expected_files) { + counter_ = 0; + for (const auto& expected_file : expected_files) { + util::optional<MockFileInfo> written_file = FindFile(expected_file.filename); + AssertFileCreated(written_file, expected_file.filename); + AssertBatchesEqual(*MakeBatch(expected_file.start, expected_file.num_rows), + *ReadAsBatch(written_file->data)); + } + } + + void AssertFilesCreated(const std::vector<std::string>& expected_files) { + for (const std::string& expected_file : expected_files) { + util::optional<MockFileInfo> written_file = FindFile(expected_file); + AssertFileCreated(written_file, expected_file); + } + } + + void AssertNotFiles(const std::vector<std::string>& expected_non_files) { + for (const auto& expected_non_file : expected_non_files) { + util::optional<MockFileInfo> file = FindFile(expected_non_file); + ASSERT_FALSE(file.has_value()); + } + } + + void AssertEmptyFiles(const std::vector<std::string>& expected_empty_files) { + for (const auto& expected_empty_file : expected_empty_files) { + util::optional<MockFileInfo> file = FindFile(expected_empty_file); + ASSERT_TRUE(file.has_value()); + ASSERT_EQ("", file->data); + } + } + + std::shared_ptr<MockFileSystem> filesystem_; + std::shared_ptr<Schema> schema_; + std::vector<std::string> pre_finish_visited_; + std::vector<std::string> post_finish_visited_; + FileSystemDatasetWriteOptions write_options_; + uint64_t counter_ = 0; +}; + +TEST_F(DatasetWriterTestFixture, Basic) { + EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), ""); + AssertFinished(queue_fut); + ASSERT_FINISHES_OK(dataset_writer->Finish()); + AssertCreatedData({{"testdir/part-0.arrow", 0, 100}}); +} + +TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) { + write_options_.max_rows_per_file = 10; + EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(35), ""); + AssertFinished(queue_fut); + ASSERT_FINISHES_OK(dataset_writer->Finish()); + AssertCreatedData({{"testdir/part-0.arrow", 0, 10}, + {"testdir/part-1.arrow", 10, 10}, + {"testdir/part-2.arrow", 20, 10}, + {"testdir/part-3.arrow", 30, 5}}); +} + +TEST_F(DatasetWriterTestFixture, MaxRowsManyWrites) { + write_options_.max_rows_per_file = 10; + EXPECT_OK_AND_ASSIGN(auto dataset_writer, DatasetWriter::Make(write_options_)); + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); + ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), "")); + ASSERT_FINISHES_OK(dataset_writer->Finish()); + AssertCreatedData({{"testdir/part-0.arrow", 0, 10}, {"testdir/part-1.arrow", 10, 8}}); +} + +TEST_F(DatasetWriterTestFixture, ConcurrentWritesSameFile) { + auto gated_fs = UseGatedFs(); Review comment: Can you add a comment explaining what this tests? It's not obvious to me how this adds to the previous tests. ########## File path: cpp/src/arrow/util/CMakeLists.txt ########## @@ -48,6 +48,7 @@ add_arrow_test(utility-test cache_test.cc checked_cast_test.cc compression_test.cc + counting_semaphore_test.cc Review comment: Perhaps move this into threading-utility-test? ########## File path: cpp/src/arrow/dataset/dataset_writer_test.cc ########## @@ -0,0 +1,344 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/dataset/dataset_writer.h" + +#include <chrono> +#include <mutex> +#include <vector> + +#include "arrow/dataset/file_ipc.h" +#include "arrow/filesystem/mockfs.h" +#include "arrow/filesystem/test_util.h" +#include "arrow/io/memory.h" +#include "arrow/ipc/reader.h" +#include "arrow/table.h" +#include "arrow/testing/future_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/util/optional.h" +#include "gtest/gtest.h" + +namespace arrow { +namespace dataset { + +using arrow::fs::internal::MockFileInfo; +using arrow::fs::internal::MockFileSystem; + +struct ExpectedFile { + std::string filename; + uint64_t start; + uint64_t num_rows; +}; + +class DatasetWriterTestFixture : public testing::Test { + protected: + void SetUp() override { + fs::TimePoint mock_now = std::chrono::system_clock::now(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr<fs::FileSystem> fs, + MockFileSystem::Make(mock_now, {::arrow::fs::Dir("testdir")})); + filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs); + schema_ = schema({field("int64", int64())}); + write_options_.filesystem = filesystem_; + write_options_.basename_template = "part-{i}.arrow"; Review comment: Nit, but can you call `chunk-{i}.arrow` or something? Seeing `part0/part-1.arrow` is confusing. ########## File path: cpp/src/arrow/testing/gtest_util.h ########## @@ -94,7 +94,7 @@ #define ARROW_EXPECT_OK(expr) \ do { \ - auto _res = (expr); \ + const auto& _res = (expr); \ Review comment: Perhaps @bkietz can advise on this. On a more abstract standpoint, I'm not sure applying IDE-driven suggestions is necessarily a good strategy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org