westonpace commented on a change in pull request #10955:
URL: https://github.com/apache/arrow/pull/10955#discussion_r718113676



##########
File path: cpp/src/arrow/dataset/dataset_writer_test.cc
##########
@@ -0,0 +1,340 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/dataset_writer.h"
+
+#include <chrono>
+#include <mutex>
+#include <vector>
+
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/filesystem/mockfs.h"
+#include "arrow/filesystem/test_util.h"
+#include "arrow/io/memory.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/table.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/optional.h"
+#include "gtest/gtest.h"
+
+namespace arrow {
+namespace dataset {
+
+using arrow::fs::internal::MockFileInfo;
+using arrow::fs::internal::MockFileSystem;
+
+struct ExpectedFile {
+  std::string filename;
+  uint64_t start;
+  uint64_t num_rows;
+};
+
+class DatasetWriterTestFixture : public testing::Test {
+ protected:
+  void SetUp() override {
+    fs::TimePoint mock_now = std::chrono::system_clock::now();
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<fs::FileSystem> fs,
+                         MockFileSystem::Make(mock_now, 
{::arrow::fs::Dir("testdir")}));
+    filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs);
+    schema_ = schema({field("int64", int64())});
+    write_options_.filesystem = filesystem_;
+    write_options_.basename_template = "part-{i}.arrow";
+    write_options_.base_dir = "testdir";
+    write_options_.writer_pre_finish = [this](FileWriter* writer) {
+      pre_finish_visited_.push_back(writer->destination().path);
+      return Status::OK();
+    };
+    write_options_.writer_post_finish = [this](FileWriter* writer) {
+      post_finish_visited_.push_back(writer->destination().path);
+      return Status::OK();
+    };
+    std::shared_ptr<FileFormat> format = std::make_shared<IpcFileFormat>();
+    write_options_.file_write_options = format->DefaultWriteOptions();
+  }
+
+  std::shared_ptr<fs::GatedMockFilesystem> UseGatedFs() {
+    fs::TimePoint mock_now = std::chrono::system_clock::now();
+    auto fs = std::make_shared<fs::GatedMockFilesystem>(mock_now);
+    ARROW_EXPECT_OK(fs->CreateDir("testdir"));
+    write_options_.filesystem = fs;
+    filesystem_ = fs;
+    return fs;
+  }
+
+  std::shared_ptr<RecordBatch> MakeBatch(uint64_t start, uint64_t num_rows) {
+    Int64Builder builder;
+    for (uint64_t i = 0; i < num_rows; i++) {
+      ARROW_EXPECT_OK(builder.Append(i + start));
+    }
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<Array> arr, builder.Finish());
+    return RecordBatch::Make(schema_, static_cast<int64_t>(num_rows), 
{std::move(arr)});
+  }
+
+  std::shared_ptr<RecordBatch> MakeBatch(uint64_t num_rows) {
+    std::shared_ptr<RecordBatch> batch = MakeBatch(counter_, num_rows);
+    counter_ += num_rows;
+    return batch;
+  }
+
+  util::optional<MockFileInfo> FindFile(const std::string& filename) {
+    for (const auto& mock_file : filesystem_->AllFiles()) {
+      if (mock_file.full_path == filename) {
+        return mock_file;
+      }
+    }
+    return util::nullopt;
+  }
+
+  void AssertVisited(const std::vector<std::string>& actual_paths,
+                     const std::string& expected_path) {
+    std::vector<std::string>::const_iterator found =
+        std::find(actual_paths.begin(), actual_paths.end(), expected_path);
+    ASSERT_NE(found, actual_paths.end())
+        << "The file " << expected_path << " was not in the list of files 
visited";
+  }
+
+  std::shared_ptr<RecordBatch> ReadAsBatch(util::string_view data) {
+    std::shared_ptr<io::RandomAccessFile> in_stream =
+        std::make_shared<io::BufferReader>(data);
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> reader,
+                         
ipc::RecordBatchFileReader::Open(std::move(in_stream)));
+    RecordBatchVector batches;
+    for (int i = 0; i < reader->num_record_batches(); i++) {
+      EXPECT_OK_AND_ASSIGN(std::shared_ptr<RecordBatch> next_batch,
+                           reader->ReadRecordBatch(i));
+      batches.push_back(next_batch);
+    }
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<Table> table, 
Table::FromRecordBatches(batches));
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<Table> combined_table, 
table->CombineChunks());
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<RecordBatch> batch,
+                         TableBatchReader(*combined_table).Next());
+    return batch;
+  }
+
+  void AssertFiles(const std::vector<ExpectedFile>& expected_files,
+                   bool verify_content = true) {
+    counter_ = 0;
+    for (const auto& expected_file : expected_files) {
+      util::optional<MockFileInfo> written_file = 
FindFile(expected_file.filename);
+      ASSERT_TRUE(written_file.has_value())
+          << "The file " << expected_file.filename << " was not created";
+      {
+        SCOPED_TRACE("pre_finish");
+        AssertVisited(pre_finish_visited_, expected_file.filename);
+      }
+      {
+        SCOPED_TRACE("post_finish");
+        AssertVisited(post_finish_visited_, expected_file.filename);
+      }
+      if (verify_content) {
+        AssertBatchesEqual(*MakeBatch(expected_file.start, 
expected_file.num_rows),
+                           *ReadAsBatch(written_file->data));
+      }
+    }
+  }
+
+  void AssertNotFiles(const std::vector<std::string>& expected_non_files) {
+    for (const auto& expected_non_file : expected_non_files) {
+      util::optional<MockFileInfo> file = FindFile(expected_non_file);
+      ASSERT_FALSE(file.has_value());
+    }
+  }
+
+  void AssertEmptyFiles(const std::vector<std::string>& expected_empty_files) {
+    for (const auto& expected_empty_file : expected_empty_files) {
+      util::optional<MockFileInfo> file = FindFile(expected_empty_file);
+      ASSERT_TRUE(file.has_value());
+      ASSERT_EQ("", file->data);
+    }
+  }
+
+  std::shared_ptr<MockFileSystem> filesystem_;
+  std::shared_ptr<Schema> schema_;
+  std::vector<std::string> pre_finish_visited_;
+  std::vector<std::string> post_finish_visited_;
+  FileSystemDatasetWriteOptions write_options_;
+  uint64_t counter_ = 0;
+};
+
+TEST_F(DatasetWriterTestFixture, Basic) {
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), "");
+  AssertFinished(queue_fut);
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertFiles({{"testdir/part-0.arrow", 0, 100}});
+}
+
+TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
+  write_options_.max_rows_per_file = 10;
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(35), "");
+  AssertFinished(queue_fut);
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertFiles({{"testdir/part-0.arrow", 0, 10},
+               {"testdir/part-1.arrow", 10, 10},
+               {"testdir/part-2.arrow", 20, 10},
+               {"testdir/part-3.arrow", 30, 5}});
+}
+
+TEST_F(DatasetWriterTestFixture, MaxRowsManyWrites) {
+  write_options_.max_rows_per_file = 10;
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertFiles({{"testdir/part-0.arrow", 0, 10}, {"testdir/part-1.arrow", 10, 
8}});
+}
+
+TEST_F(DatasetWriterTestFixture, ConcurrentWritesSameFile) {
+  auto gated_fs = UseGatedFs();
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  for (int i = 0; i < 10; i++) {
+    Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(10), "");
+    AssertFinished(queue_fut);
+    ASSERT_FINISHES_OK(queue_fut);
+  }
+  ASSERT_OK(gated_fs->WaitForOpenOutputStream(1));
+  ASSERT_OK(gated_fs->UnlockOpenOutputStream(1));
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertFiles({{"testdir/part-0.arrow", 0, 100}});
+}
+
+TEST_F(DatasetWriterTestFixture, ConcurrentWritesDifferentFiles) {
+  // NBATCHES must be less than I/O executor concurrency to avoid deadlock / 
test failure
+  constexpr int NBATCHES = 6;
+  auto gated_fs = UseGatedFs();
+  std::vector<ExpectedFile> expected_files;
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  for (int i = 0; i < NBATCHES; i++) {
+    std::string i_str = std::to_string(i);
+    expected_files.push_back(ExpectedFile{"testdir/part" + i_str + 
"/part-0.arrow",
+                                          static_cast<uint64_t>(i) * 10, 10});
+    Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(10), 
"part" + i_str);
+    AssertFinished(queue_fut);
+    ASSERT_FINISHES_OK(queue_fut);
+  }
+  ASSERT_OK(gated_fs->WaitForOpenOutputStream(NBATCHES));
+  ASSERT_OK(gated_fs->UnlockOpenOutputStream(NBATCHES));
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertFiles(expected_files);
+}
+
+TEST_F(DatasetWriterTestFixture, MaxOpenFiles) {
+  auto gated_fs = UseGatedFs();
+  write_options_.max_open_files = 2;
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part0"));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part1"));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part0"));
+  Future<> fut = dataset_writer->WriteRecordBatch(MakeBatch(10), "part2");
+  // Backpressure will be applied until an existing file can be evicted
+  AssertNotFinished(fut);
+
+  // Ungate the writes to relieve the pressure, testdir/part0 should be closed
+  ASSERT_OK(gated_fs->WaitForOpenOutputStream(2));
+  ASSERT_OK(gated_fs->UnlockOpenOutputStream(5));
+  ASSERT_FINISHES_OK(fut);
+
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part0"));
+  // Following call should resume existing write but, on slow test systems, 
the old
+  // write may have already been finished
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part1"));
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertFiles({{"testdir/part0/part-0.arrow", 0, 10},
+               {"testdir/part0/part-0.arrow", 20, 10},

Review comment:
       The issue wasn't non-determinism but simply a fact that I didn't want to 
extend the file verification logic to handle the case where there is a gap in 
row ids when that wasn't really the point of the test.  The file 
part0/part-0.arrow should contain rows 0-10 and rows 20-30.
   
   To make it more clear I updated the verification methods so there is no 
`AssertCreatedData` (which checks both content and filenames as before) and 
`AssertFilesCreated` (which checks only filenames and doesn't take in unused 
row indices/offsets).
   
   So hopefully this is a bit more clear.  The choice of file is deterministic 
in this test.  In truth, the dataset writer is very deterministic and 
controlled entirely by the order of calls to `WriteRecordBatch` (and this 
method is not reentrant).  In practice an actual dataset write might be less so 
because there is no telling what order `WriteRecordBatch` will be called in.
   
   I do have `AssertNotFiles` which I used in tests that delete existing data.

##########
File path: cpp/src/arrow/dataset/file_base.h
##########
@@ -343,6 +343,18 @@ class ARROW_DS_EXPORT FileWriter {
   fs::FileLocator destination_locator_;
 };
 
+/// \brief Controls what happens if files exist in an output directory during 
a dataset
+/// write
+enum ExistingDataBehavior : int8_t {
+  /// Deletes all files in a directory the first time that directory is 
encountered
+  kDeleteMatchingPartitions,
+  /// Ignores existing files, overwriting any that happen to have the same 
name as an
+  /// output file
+  kOverwriteOrIgnore,
+  /// Returns an error if there are any files or subdirectories in the output 
directory
+  kError,

Review comment:
       @jorisvandenbossche Do you want to create a JIRA for this issue?

##########
File path: cpp/src/arrow/dataset/file_base.h
##########
@@ -364,6 +376,18 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
   /// {i} will be replaced by an auto incremented integer.
   std::string basename_template;
 
+  /// If greater than 0 then this will limit the maximum number of files that 
can be left
+  /// open. If an attempt is made to open too many files then the least 
recently used file
+  /// will be closed.  If this setting is set too low you may end up 
fragmenting your data
+  /// into many small files.
+  uint32_t max_open_files = 1024;
+
+  /// If greater than 0 then this will limit how many rows are placed in any 
single file.

Review comment:
       I updated the comment to clarify what happens in that case.  Yes, it 
will create one file per directory.

##########
File path: cpp/src/arrow/dataset/dataset_writer.cc
##########
@@ -0,0 +1,529 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/dataset_writer.h"
+
+#include <list>
+#include <mutex>
+#include <unordered_map>
+
+#include "arrow/filesystem/path_util.h"
+#include "arrow/record_batch.h"
+#include "arrow/result.h"
+#include "arrow/util/future.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/map.h"
+#include "arrow/util/string.h"
+
+namespace arrow {
+namespace dataset {
+
+namespace {
+
+constexpr util::string_view kIntegerToken = "{i}";
+
+class Throttle {
+ public:
+  explicit Throttle(uint64_t max_value) : max_value_(max_value) {}
+
+  bool Unthrottled() const { return max_value_ <= 0; }
+
+  Future<> Acquire(uint64_t values) {
+    if (Unthrottled()) {
+      return Future<>::MakeFinished();
+    }
+    std::lock_guard<std::mutex> lg(mutex_);
+    if (values + current_value_ > max_value_) {
+      in_waiting_ = values;
+      backpressure_ = Future<>::Make();
+    } else {
+      current_value_ += values;
+    }
+    return backpressure_;
+  }
+
+  void Release(uint64_t values) {
+    if (Unthrottled()) {
+      return;
+    }
+    Future<> to_complete;
+    {
+      std::lock_guard<std::mutex> lg(mutex_);
+      current_value_ -= values;
+      if (in_waiting_ > 0 && in_waiting_ + current_value_ <= max_value_) {
+        in_waiting_ = 0;
+        to_complete = backpressure_;
+      }
+    }
+    if (to_complete.is_valid()) {
+      to_complete.MarkFinished();
+    }
+  }
+
+ private:
+  Future<> backpressure_ = Future<>::MakeFinished();
+  uint64_t max_value_;
+  uint64_t in_waiting_ = 0;
+  uint64_t current_value_ = 0;
+  std::mutex mutex_;
+};
+
+class DatasetWriterFileQueue : public util::AsyncDestroyable {
+ public:
+  explicit DatasetWriterFileQueue(const Future<std::shared_ptr<FileWriter>>& 
writer_fut,
+                                  const FileSystemDatasetWriteOptions& options,
+                                  std::mutex* visitors_mutex)
+      : options_(options), visitors_mutex_(visitors_mutex) {
+    running_task_ = Future<>::Make();
+    writer_fut.AddCallback(
+        [this](const Result<std::shared_ptr<FileWriter>>& maybe_writer) {
+          if (maybe_writer.ok()) {
+            writer_ = *maybe_writer;
+            Flush();
+          } else {
+            Abort(maybe_writer.status());
+          }
+        });
+  }
+
+  Future<uint64_t> Push(std::shared_ptr<RecordBatch> batch) {
+    std::unique_lock<std::mutex> lk(mutex);
+    write_queue_.push_back(std::move(batch));
+    Future<uint64_t> write_future = Future<uint64_t>::Make();
+    write_futures_.push_back(write_future);
+    if (!running_task_.is_valid()) {
+      running_task_ = Future<>::Make();
+      FlushUnlocked(std::move(lk));
+    }
+    return write_future;
+  }
+
+  Future<> DoDestroy() override {
+    std::lock_guard<std::mutex> lg(mutex);
+    if (!running_task_.is_valid()) {
+      RETURN_NOT_OK(DoFinish());
+      return Future<>::MakeFinished();
+    }
+    return running_task_.Then([this] { return DoFinish(); });
+  }
+
+ private:
+  Future<uint64_t> WriteNext() {
+    // May want to prototype / measure someday pushing the async write down 
further
+    return DeferNotOk(
+        io::default_io_context().executor()->Submit([this]() -> 
Result<uint64_t> {
+          DCHECK(running_task_.is_valid());
+          std::unique_lock<std::mutex> lk(mutex);
+          const std::shared_ptr<RecordBatch>& to_write = write_queue_.front();
+          Future<uint64_t> on_complete = write_futures_.front();
+          uint64_t rows_to_write = to_write->num_rows();
+          lk.unlock();
+          Status status = writer_->Write(to_write);
+          lk.lock();
+          write_queue_.pop_front();
+          write_futures_.pop_front();
+          lk.unlock();
+          if (!status.ok()) {
+            on_complete.MarkFinished(status);
+          } else {
+            on_complete.MarkFinished(rows_to_write);
+          }
+          return rows_to_write;
+        }));
+  }
+
+  Status DoFinish() {
+    {
+      std::lock_guard<std::mutex> lg(*visitors_mutex_);
+      RETURN_NOT_OK(options_.writer_pre_finish(writer_.get()));
+    }
+    RETURN_NOT_OK(writer_->Finish());
+    {
+      std::lock_guard<std::mutex> lg(*visitors_mutex_);
+      return options_.writer_post_finish(writer_.get());
+    }
+  }
+
+  void Abort(Status err) {
+    std::vector<Future<uint64_t>> futures_to_abort;
+    Future<> old_running_task = running_task_;
+    {
+      std::lock_guard<std::mutex> lg(mutex);
+      write_queue_.clear();
+      futures_to_abort =
+          std::vector<Future<uint64_t>>(write_futures_.begin(), 
write_futures_.end());
+      write_futures_.clear();
+      running_task_ = Future<>();
+    }
+    for (auto& fut : futures_to_abort) {
+      fut.MarkFinished(err);
+    }
+    old_running_task.MarkFinished(std::move(err));
+  }
+
+  void Flush() {
+    std::unique_lock<std::mutex> lk(mutex);
+    FlushUnlocked(std::move(lk));
+  }
+
+  void FlushUnlocked(std::unique_lock<std::mutex> lk) {
+    if (write_queue_.empty()) {
+      Future<> old_running_task = running_task_;
+      running_task_ = Future<>();
+      lk.unlock();
+      old_running_task.MarkFinished();
+      return;
+    }
+    WriteNext().AddCallback([this](const Result<uint64_t>& res) {
+      if (res.ok()) {
+        Flush();
+      } else {
+        Abort(res.status());
+      }
+    });
+  }
+
+  const FileSystemDatasetWriteOptions& options_;
+  std::mutex* visitors_mutex_;
+  std::shared_ptr<FileWriter> writer_;
+  std::mutex mutex;
+  std::list<std::shared_ptr<RecordBatch>> write_queue_;
+  std::list<Future<uint64_t>> write_futures_;
+  Future<> running_task_;
+};
+
+struct WriteTask {
+  std::string filename;
+  uint64_t num_rows;
+};
+
+class DatasetWriterDirectoryQueue : public util::AsyncDestroyable {
+ public:
+  DatasetWriterDirectoryQueue(std::string directory, std::shared_ptr<Schema> 
schema,
+                              const FileSystemDatasetWriteOptions& 
write_options,
+                              Throttle* open_files_throttle, std::mutex* 
visitors_mutex)
+      : directory_(std::move(directory)),
+        schema_(std::move(schema)),
+        write_options_(write_options),
+        open_files_throttle_(open_files_throttle),
+        visitors_mutex_(visitors_mutex) {}
+
+  Result<std::shared_ptr<RecordBatch>> NextWritableChunk(
+      std::shared_ptr<RecordBatch> batch, std::shared_ptr<RecordBatch>* 
remainder,
+      bool* will_open_file) const {
+    DCHECK_GT(batch->num_rows(), 0);
+    uint64_t rows_available = std::numeric_limits<uint64_t>::max();
+    *will_open_file = rows_written_ == 0;
+    if (write_options_.max_rows_per_file > 0) {
+      rows_available = write_options_.max_rows_per_file - rows_written_;
+    }
+
+    std::shared_ptr<RecordBatch> to_queue;
+    if (rows_available < static_cast<uint64_t>(batch->num_rows())) {
+      to_queue = batch->Slice(0, static_cast<int64_t>(rows_available));
+      *remainder = batch->Slice(static_cast<int64_t>(rows_available));
+    } else {
+      to_queue = std::move(batch);
+    }
+    return to_queue;
+  }
+
+  Future<WriteTask> StartWrite(const std::shared_ptr<RecordBatch>& batch) {
+    rows_written_ += batch->num_rows();
+    WriteTask task{current_filename_, 
static_cast<uint64_t>(batch->num_rows())};
+    if (!latest_open_file_) {
+      ARROW_ASSIGN_OR_RAISE(latest_open_file_, 
OpenFileQueue(current_filename_));
+    }
+    return latest_open_file_->Push(batch).Then([task] { return task; });
+  }
+
+  Result<std::string> GetNextFilename() {
+    auto basename = ::arrow::internal::Replace(
+        write_options_.basename_template, kIntegerToken, 
std::to_string(file_counter_++));
+    if (!basename) {
+      return Status::Invalid("string interpolation of basename template 
failed");
+    }
+
+    return fs::internal::ConcatAbstractPath(directory_, *basename);
+  }
+
+  Status FinishCurrentFile() {
+    if (latest_open_file_) {
+      latest_open_file_ = nullptr;
+    }
+    rows_written_ = 0;
+    return GetNextFilename().Value(&current_filename_);
+  }
+
+  Result<std::shared_ptr<FileWriter>> OpenWriter(const std::string& filename) {
+    ARROW_ASSIGN_OR_RAISE(std::shared_ptr<io::OutputStream> out_stream,
+                          
write_options_.filesystem->OpenOutputStream(filename));
+    return write_options_.format()->MakeWriter(std::move(out_stream), schema_,
+                                               
write_options_.file_write_options,
+                                               {write_options_.filesystem, 
filename});
+  }
+
+  Result<std::shared_ptr<DatasetWriterFileQueue>> OpenFileQueue(
+      const std::string& filename) {
+    Future<std::shared_ptr<FileWriter>> file_writer_fut =
+        init_future_.Then([this, filename] {
+          ::arrow::internal::Executor* io_executor =
+              write_options_.filesystem->io_context().executor();
+          return DeferNotOk(
+              io_executor->Submit([this, filename]() { return 
OpenWriter(filename); }));
+        });
+    auto file_queue = util::MakeSharedAsync<DatasetWriterFileQueue>(
+        file_writer_fut, write_options_, visitors_mutex_);
+    RETURN_NOT_OK(task_group_.AddTask(
+        file_queue->on_closed().Then([this] { 
open_files_throttle_->Release(1); })));
+    return file_queue;
+  }
+
+  uint64_t rows_written() const { return rows_written_; }
+
+  void PrepareDirectory() {
+    init_future_ =
+        
DeferNotOk(write_options_.filesystem->io_context().executor()->Submit([this] {
+          RETURN_NOT_OK(write_options_.filesystem->CreateDir(directory_));
+          if (write_options_.existing_data_behavior == 
kDeleteMatchingPartitions) {
+            fs::FileSelector selector;
+            selector.base_dir = directory_;
+            selector.recursive = true;
+            return write_options_.filesystem->DeleteFiles(selector);

Review comment:
       Good catch.  I switched to using that and removed my changes to 
`filesystem.h`

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3295,8 +3295,8 @@ def test_write_dataset_with_scanner(tempdir):
     dataset = ds.dataset(tempdir, partitioning=["b"])
 
     with tempfile.TemporaryDirectory() as tempdir2:
-        ds.write_dataset(dataset.scanner(columns=["b", "c"]), tempdir2,
-                         format='parquet', partitioning=["b"])
+        ds.write_dataset(dataset.scanner(columns=["b", "c"], use_async=True),

Review comment:
       Yes.  I'm hoping most users are not creating scanners directly so this 
change should be fairly smooth.

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -3656,16 +3650,12 @@ def file_visitor(written_file):
 
     # Since it is a multi-threaded write there is no way to know which
     # directory gets part-0 and which gets part-1
-    expected_paths_a = {
+    expected_paths = {

Review comment:
       Good catch.  Fixed.

##########
File path: cpp/src/arrow/dataset/dataset_writer.h
##########
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "arrow/dataset/file_base.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/util/future.h"
+
+namespace arrow {
+namespace dataset {
+
+constexpr uint64_t kDefaultDatasetWriterMaxRowsQueued = 64 * 1024 * 1024;
+
+/// \brief Utility class that manages a set of writers to different paths
+///
+/// Writers may be closed and reopened (and a new file created) based on the 
dataset
+/// write options (for example, min_rows_per_file or max_open_files)
+///
+/// The dataset writer enforces its own back pressure based on the # of rows 
(as opposed
+/// to # of batches which is how it is typically enforced elsewhere) and # of 
files.
+class DatasetWriter {
+ public:
+  /// \brief Creates a dataset writer
+  /// max_rows_queued represents the max number of rows allowed in the dataset 
writer
+  /// at any given time.
+  DatasetWriter(FileSystemDatasetWriteOptions write_options,
+                std::shared_ptr<Schema> schema,
+                uint64_t max_rows_queued = kDefaultDatasetWriterMaxRowsQueued);
+
+  ~DatasetWriter();
+
+  /// \brief Writes a batch to the dataset
+  /// \param[in] directory The directory to write to
+  ///
+  /// Note: The written filename will be {directory}/{filename_factory(i)} 
where i is a
+  /// counter controlled by `max_open_files` and `max_rows_per_file`
+  ///
+  /// If multiple WriteRecordBatch calls arrive with the same `directory` then 
the batches
+  /// may be written to the same file.

Review comment:
       I created ARROW-14164

##########
File path: cpp/src/arrow/dataset/file_base.h
##########
@@ -343,6 +343,18 @@ class ARROW_DS_EXPORT FileWriter {
   fs::FileLocator destination_locator_;
 };
 
+/// \brief Controls what happens if files exist in an output directory during 
a dataset
+/// write
+enum ExistingDataBehavior : int8_t {
+  /// Deletes all files in a directory the first time that directory is 
encountered
+  kDeleteMatchingPartitions,
+  /// Ignores existing files, overwriting any that happen to have the same 
name as an
+  /// output file
+  kOverwriteOrIgnore,
+  /// Returns an error if there are any files or subdirectories in the output 
directory
+  kError,

Review comment:
       Yes, you described the problem well.  The rollback would be needed.  I 
created ARROW-14175

##########
File path: cpp/src/arrow/filesystem/test_util.h
##########
@@ -38,6 +40,23 @@ static inline FileInfo Dir(std::string path) {
   return FileInfo(std::move(path), FileType::Directory);
 }
 
+class ARROW_TESTING_EXPORT GatedMockFilesystem : public 
internal::MockFileSystem {

Review comment:
       Done.

##########
File path: cpp/src/arrow/filesystem/test_util.cc
##########
@@ -167,6 +165,27 @@ void AssertFileInfo(FileSystem* fs, const std::string& 
path, FileType type,
   AssertFileInfo(info, path, type, size);
 }
 
+GatedMockFilesystem::GatedMockFilesystem(TimePoint current_time,
+                                         const io::IOContext& io_context)
+    : internal::MockFileSystem(current_time, io_context) {}
+GatedMockFilesystem::~GatedMockFilesystem() = default;
+
+Result<std::shared_ptr<io::OutputStream>> 
GatedMockFilesystem::OpenOutputStream(
+    const std::string& path, const std::shared_ptr<const KeyValueMetadata>& 
metadata) {
+  RETURN_NOT_OK(open_output_sem_.Acquire(1));
+  return MockFileSystem::OpenOutputStream(path, metadata);
+}
+
+// Wait until at least num_waiters are waiting on OpenOutputStream

Review comment:
       I moved these to the .h file.

##########
File path: cpp/src/arrow/dataset/dataset_writer.h
##########
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "arrow/dataset/file_base.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/util/async_util.h"
+#include "arrow/util/future.h"
+
+namespace arrow {
+namespace dataset {
+
+constexpr uint64_t kDefaultDatasetWriterMaxRowsQueued = 64 * 1024 * 1024;
+
+/// \brief Utility class that manages a set of writers to different paths
+///
+/// Writers may be closed and reopened (and a new file created) based on the 
dataset
+/// write options (for example, max_rows_per_file or max_open_files)
+///
+/// The dataset writer enforces its own back pressure based on the # of rows 
(as opposed
+/// to # of batches which is how it is typically enforced elsewhere) and # of 
files.
+class ARROW_DS_EXPORT DatasetWriter {
+ public:
+  /// \brief Creates a dataset writer

Review comment:
       Thanks, fixed.

##########
File path: cpp/src/arrow/dataset/dataset_writer.h
##########
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "arrow/dataset/file_base.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/util/async_util.h"
+#include "arrow/util/future.h"
+
+namespace arrow {
+namespace dataset {
+
+constexpr uint64_t kDefaultDatasetWriterMaxRowsQueued = 64 * 1024 * 1024;
+
+/// \brief Utility class that manages a set of writers to different paths
+///
+/// Writers may be closed and reopened (and a new file created) based on the 
dataset
+/// write options (for example, max_rows_per_file or max_open_files)
+///
+/// The dataset writer enforces its own back pressure based on the # of rows 
(as opposed
+/// to # of batches which is how it is typically enforced elsewhere) and # of 
files.
+class ARROW_DS_EXPORT DatasetWriter {
+ public:
+  /// \brief Creates a dataset writer
+  ///
+  /// Will fail if basename_template is invalid or if there is existing data 
and
+  /// existing_data_behavior is kError
+  ///
+  /// \param write_options options to control how the data should be written
+  /// \param max_rows_queued max # of rows allowed to be queued before the 
dataset_writer
+  ///                        will ask for backpressure
+  static Result<std::unique_ptr<DatasetWriter>> Make(
+      FileSystemDatasetWriteOptions write_options,
+      uint64_t max_rows_queued = kDefaultDatasetWriterMaxRowsQueued);
+
+  ~DatasetWriter();
+
+  /// \brief Writes a batch to the dataset
+  /// \param[in] batch The batch to write
+  /// \param[in] directory The directory to write to
+  ///
+  /// Note: The written filename will be {directory}/{filename_factory(i)} 
where i is a
+  /// counter controlled by `max_open_files` and `max_rows_per_file`
+  ///
+  /// If multiple WriteRecordBatch calls arrive with the same `directory` then 
the batches
+  /// may be written to the same file.
+  ///
+  /// The returned future will be marked finished when the record batch has 
been queued
+  /// to be written.  If the returned future is unfinished then this indicates 
the dataset
+  /// writer's queue is full and the data provider should pause.
+  ///
+  /// This method is NOT async reentrant.  The returned future will only be 
incomplete

Review comment:
       I changed this to unfinished, let me know if that is enough.  I mean the 
returned future will always be marked finished unless the dataset writer is 
"full" and then it will return an unfinished future that will get marked 
finished when it is safe to write again.

##########
File path: cpp/src/arrow/dataset/dataset_writer_test.cc
##########
@@ -0,0 +1,344 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/dataset_writer.h"
+
+#include <chrono>
+#include <mutex>
+#include <vector>
+
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/filesystem/mockfs.h"
+#include "arrow/filesystem/test_util.h"
+#include "arrow/io/memory.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/table.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/optional.h"
+#include "gtest/gtest.h"
+
+namespace arrow {
+namespace dataset {
+
+using arrow::fs::internal::MockFileInfo;
+using arrow::fs::internal::MockFileSystem;
+
+struct ExpectedFile {
+  std::string filename;
+  uint64_t start;
+  uint64_t num_rows;
+};
+
+class DatasetWriterTestFixture : public testing::Test {
+ protected:
+  void SetUp() override {
+    fs::TimePoint mock_now = std::chrono::system_clock::now();
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<fs::FileSystem> fs,
+                         MockFileSystem::Make(mock_now, 
{::arrow::fs::Dir("testdir")}));
+    filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs);
+    schema_ = schema({field("int64", int64())});
+    write_options_.filesystem = filesystem_;
+    write_options_.basename_template = "part-{i}.arrow";
+    write_options_.base_dir = "testdir";
+    write_options_.writer_pre_finish = [this](FileWriter* writer) {
+      pre_finish_visited_.push_back(writer->destination().path);
+      return Status::OK();
+    };
+    write_options_.writer_post_finish = [this](FileWriter* writer) {
+      post_finish_visited_.push_back(writer->destination().path);
+      return Status::OK();
+    };
+    std::shared_ptr<FileFormat> format = std::make_shared<IpcFileFormat>();
+    write_options_.file_write_options = format->DefaultWriteOptions();
+  }
+
+  std::shared_ptr<fs::GatedMockFilesystem> UseGatedFs() {
+    fs::TimePoint mock_now = std::chrono::system_clock::now();
+    auto fs = std::make_shared<fs::GatedMockFilesystem>(mock_now);
+    ARROW_EXPECT_OK(fs->CreateDir("testdir"));
+    write_options_.filesystem = fs;
+    filesystem_ = fs;
+    return fs;
+  }
+
+  std::shared_ptr<RecordBatch> MakeBatch(uint64_t start, uint64_t num_rows) {
+    Int64Builder builder;
+    for (uint64_t i = 0; i < num_rows; i++) {
+      ARROW_EXPECT_OK(builder.Append(i + start));
+    }
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<Array> arr, builder.Finish());
+    return RecordBatch::Make(schema_, static_cast<int64_t>(num_rows), 
{std::move(arr)});
+  }
+
+  std::shared_ptr<RecordBatch> MakeBatch(uint64_t num_rows) {
+    std::shared_ptr<RecordBatch> batch = MakeBatch(counter_, num_rows);
+    counter_ += num_rows;
+    return batch;
+  }
+
+  util::optional<MockFileInfo> FindFile(const std::string& filename) {
+    for (const auto& mock_file : filesystem_->AllFiles()) {
+      if (mock_file.full_path == filename) {
+        return mock_file;
+      }
+    }
+    return util::nullopt;
+  }
+
+  void AssertVisited(const std::vector<std::string>& actual_paths,
+                     const std::string& expected_path) {
+    std::vector<std::string>::const_iterator found =

Review comment:
       Fixed.

##########
File path: cpp/src/arrow/dataset/dataset_writer_test.cc
##########
@@ -0,0 +1,344 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/dataset_writer.h"
+
+#include <chrono>
+#include <mutex>
+#include <vector>
+
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/filesystem/mockfs.h"
+#include "arrow/filesystem/test_util.h"
+#include "arrow/io/memory.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/table.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/optional.h"
+#include "gtest/gtest.h"
+
+namespace arrow {
+namespace dataset {
+
+using arrow::fs::internal::MockFileInfo;
+using arrow::fs::internal::MockFileSystem;
+
+struct ExpectedFile {

Review comment:
       Done.

##########
File path: cpp/src/arrow/dataset/dataset_writer_test.cc
##########
@@ -0,0 +1,344 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/dataset_writer.h"
+
+#include <chrono>
+#include <mutex>
+#include <vector>
+
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/filesystem/mockfs.h"
+#include "arrow/filesystem/test_util.h"
+#include "arrow/io/memory.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/table.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/optional.h"
+#include "gtest/gtest.h"
+
+namespace arrow {
+namespace dataset {
+
+using arrow::fs::internal::MockFileInfo;
+using arrow::fs::internal::MockFileSystem;
+
+struct ExpectedFile {
+  std::string filename;
+  uint64_t start;
+  uint64_t num_rows;
+};
+
+class DatasetWriterTestFixture : public testing::Test {
+ protected:
+  void SetUp() override {
+    fs::TimePoint mock_now = std::chrono::system_clock::now();
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<fs::FileSystem> fs,
+                         MockFileSystem::Make(mock_now, 
{::arrow::fs::Dir("testdir")}));
+    filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs);
+    schema_ = schema({field("int64", int64())});
+    write_options_.filesystem = filesystem_;
+    write_options_.basename_template = "part-{i}.arrow";
+    write_options_.base_dir = "testdir";
+    write_options_.writer_pre_finish = [this](FileWriter* writer) {
+      pre_finish_visited_.push_back(writer->destination().path);
+      return Status::OK();
+    };
+    write_options_.writer_post_finish = [this](FileWriter* writer) {
+      post_finish_visited_.push_back(writer->destination().path);
+      return Status::OK();
+    };
+    std::shared_ptr<FileFormat> format = std::make_shared<IpcFileFormat>();
+    write_options_.file_write_options = format->DefaultWriteOptions();
+  }
+
+  std::shared_ptr<fs::GatedMockFilesystem> UseGatedFs() {
+    fs::TimePoint mock_now = std::chrono::system_clock::now();
+    auto fs = std::make_shared<fs::GatedMockFilesystem>(mock_now);
+    ARROW_EXPECT_OK(fs->CreateDir("testdir"));
+    write_options_.filesystem = fs;
+    filesystem_ = fs;
+    return fs;
+  }
+
+  std::shared_ptr<RecordBatch> MakeBatch(uint64_t start, uint64_t num_rows) {
+    Int64Builder builder;
+    for (uint64_t i = 0; i < num_rows; i++) {
+      ARROW_EXPECT_OK(builder.Append(i + start));
+    }
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<Array> arr, builder.Finish());
+    return RecordBatch::Make(schema_, static_cast<int64_t>(num_rows), 
{std::move(arr)});
+  }
+
+  std::shared_ptr<RecordBatch> MakeBatch(uint64_t num_rows) {
+    std::shared_ptr<RecordBatch> batch = MakeBatch(counter_, num_rows);
+    counter_ += num_rows;
+    return batch;
+  }
+
+  util::optional<MockFileInfo> FindFile(const std::string& filename) {
+    for (const auto& mock_file : filesystem_->AllFiles()) {
+      if (mock_file.full_path == filename) {
+        return mock_file;
+      }
+    }
+    return util::nullopt;
+  }
+
+  void AssertVisited(const std::vector<std::string>& actual_paths,
+                     const std::string& expected_path) {
+    std::vector<std::string>::const_iterator found =
+        std::find(actual_paths.begin(), actual_paths.end(), expected_path);
+    ASSERT_NE(found, actual_paths.end())
+        << "The file " << expected_path << " was not in the list of files 
visited";
+  }
+
+  std::shared_ptr<RecordBatch> ReadAsBatch(util::string_view data) {
+    std::shared_ptr<io::RandomAccessFile> in_stream =
+        std::make_shared<io::BufferReader>(data);
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> reader,
+                         
ipc::RecordBatchFileReader::Open(std::move(in_stream)));
+    RecordBatchVector batches;
+    for (int i = 0; i < reader->num_record_batches(); i++) {
+      EXPECT_OK_AND_ASSIGN(std::shared_ptr<RecordBatch> next_batch,
+                           reader->ReadRecordBatch(i));
+      batches.push_back(next_batch);
+    }
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<Table> table, 
Table::FromRecordBatches(batches));
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<Table> combined_table, 
table->CombineChunks());
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<RecordBatch> batch,
+                         TableBatchReader(*combined_table).Next());
+    return batch;
+  }
+
+  void AssertFileCreated(const util::optional<MockFileInfo>& maybe_file,
+                         const std::string& expected_filename) {
+    ASSERT_TRUE(maybe_file.has_value())
+        << "The file " << expected_filename << " was not created";
+    {
+      SCOPED_TRACE("pre_finish");
+      AssertVisited(pre_finish_visited_, expected_filename);
+    }
+    {
+      SCOPED_TRACE("post_finish");
+      AssertVisited(post_finish_visited_, expected_filename);
+    }
+  }
+
+  void AssertCreatedData(const std::vector<ExpectedFile>& expected_files) {
+    counter_ = 0;
+    for (const auto& expected_file : expected_files) {
+      util::optional<MockFileInfo> written_file = 
FindFile(expected_file.filename);
+      AssertFileCreated(written_file, expected_file.filename);
+      AssertBatchesEqual(*MakeBatch(expected_file.start, 
expected_file.num_rows),
+                         *ReadAsBatch(written_file->data));
+    }
+  }
+
+  void AssertFilesCreated(const std::vector<std::string>& expected_files) {
+    for (const std::string& expected_file : expected_files) {
+      util::optional<MockFileInfo> written_file = FindFile(expected_file);
+      AssertFileCreated(written_file, expected_file);
+    }
+  }
+
+  void AssertNotFiles(const std::vector<std::string>& expected_non_files) {
+    for (const auto& expected_non_file : expected_non_files) {
+      util::optional<MockFileInfo> file = FindFile(expected_non_file);
+      ASSERT_FALSE(file.has_value());
+    }
+  }
+
+  void AssertEmptyFiles(const std::vector<std::string>& expected_empty_files) {
+    for (const auto& expected_empty_file : expected_empty_files) {
+      util::optional<MockFileInfo> file = FindFile(expected_empty_file);
+      ASSERT_TRUE(file.has_value());
+      ASSERT_EQ("", file->data);
+    }
+  }
+
+  std::shared_ptr<MockFileSystem> filesystem_;
+  std::shared_ptr<Schema> schema_;
+  std::vector<std::string> pre_finish_visited_;
+  std::vector<std::string> post_finish_visited_;
+  FileSystemDatasetWriteOptions write_options_;
+  uint64_t counter_ = 0;
+};
+
+TEST_F(DatasetWriterTestFixture, Basic) {
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), "");
+  AssertFinished(queue_fut);
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertCreatedData({{"testdir/part-0.arrow", 0, 100}});
+}
+
+TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
+  write_options_.max_rows_per_file = 10;
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(35), "");
+  AssertFinished(queue_fut);
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertCreatedData({{"testdir/part-0.arrow", 0, 10},
+                     {"testdir/part-1.arrow", 10, 10},
+                     {"testdir/part-2.arrow", 20, 10},
+                     {"testdir/part-3.arrow", 30, 5}});
+}
+
+TEST_F(DatasetWriterTestFixture, MaxRowsManyWrites) {
+  write_options_.max_rows_per_file = 10;
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertCreatedData({{"testdir/part-0.arrow", 0, 10}, {"testdir/part-1.arrow", 
10, 8}});
+}
+
+TEST_F(DatasetWriterTestFixture, ConcurrentWritesSameFile) {
+  auto gated_fs = UseGatedFs();

Review comment:
       I added a comment.  It is basically to ensure we queue up behind a 
single long running "open" job instead of trying a second open or trying to use 
an unopened file.

##########
File path: r/R/dataset-write.R
##########
@@ -111,7 +111,7 @@ write_dataset <- function(dataset,
     dataset <- dplyr::ungroup(dataset)
   }
 
-  scanner <- Scanner$create(dataset)
+  scanner <- Scanner$create(dataset, use_async=TRUE)

Review comment:
       Fixed.

##########
File path: cpp/src/arrow/util/counting_semaphore.h
##########
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_COUNTING_SEMAPHORE_H
+#define ARROW_COUNTING_SEMAPHORE_H
+
+#include <memory>
+
+#include "arrow/status.h"
+
+namespace arrow {
+namespace util {
+
+/// \brief Simple mutex-based counting semaphore with timeout
+class ARROW_EXPORT CountingSemaphore {
+ public:
+  /// \brief Create an instance with initial_avail starting permits
+  ///
+  /// \param[in] initial_avail The semaphore will start with this many permits 
available
+  /// \param[in] timeout_seconds A timeout to be applied to all operations.  
Operations
+  ///            will return Status::Invalid if this timeout elapses
+  explicit CountingSemaphore(uint32_t initial_avail = 0, double 
timeout_seconds = 10);
+  ~CountingSemaphore();
+  /// \brief Block until num_permits permits are available
+  Status Acquire(uint32_t num_permits);
+  /// \brief Make num_permits permits available
+  Status Release(uint32_t num_permits);
+  /// \brief Waits until num_waiters are waiting on permits

Review comment:
       Fixed the grammar.  Yes, a barrier is a good description.

##########
File path: cpp/src/arrow/util/CMakeLists.txt
##########
@@ -48,6 +48,7 @@ add_arrow_test(utility-test
                cache_test.cc
                checked_cast_test.cc
                compression_test.cc
+               counting_semaphore_test.cc

Review comment:
       Done.

##########
File path: cpp/src/arrow/testing/gtest_util.h
##########
@@ -94,7 +94,7 @@
 
 #define ARROW_EXPECT_OK(expr)                                           \
   do {                                                                  \
-    auto _res = (expr);                                                 \
+    const auto& _res = (expr);                                          \

Review comment:
       Everything still runs ok and I think the lifetime of the temporary is 
extended to the lifetime of the reference: 
https://stackoverflow.com/questions/42868744/what-happens-when-assigning-an-rvalue-to-const-auto
   
   Full disclaimer, I did not recognize this myself, it was my IDE complaining. 
 I don't feel strongly about the change and am happy to revert back.

##########
File path: cpp/src/arrow/dataset/dataset_writer.h
##########
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+
+#include "arrow/dataset/file_base.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/util/async_util.h"
+#include "arrow/util/future.h"
+
+namespace arrow {
+namespace dataset {
+
+constexpr uint64_t kDefaultDatasetWriterMaxRowsQueued = 64 * 1024 * 1024;
+
+/// \brief Utility class that manages a set of writers to different paths
+///
+/// Writers may be closed and reopened (and a new file created) based on the 
dataset
+/// write options (for example, max_rows_per_file or max_open_files)
+///
+/// The dataset writer enforces its own back pressure based on the # of rows 
(as opposed
+/// to # of batches which is how it is typically enforced elsewhere) and # of 
files.
+class ARROW_DS_EXPORT DatasetWriter {

Review comment:
       It might be useful externally so I don't mind doing so but I have no 
desire to create work that isn't asked for.  I moved this to internal.

##########
File path: cpp/src/arrow/dataset/dataset_writer_test.cc
##########
@@ -0,0 +1,344 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/dataset_writer.h"
+
+#include <chrono>
+#include <mutex>
+#include <vector>
+
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/filesystem/mockfs.h"
+#include "arrow/filesystem/test_util.h"
+#include "arrow/io/memory.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/table.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/optional.h"
+#include "gtest/gtest.h"
+
+namespace arrow {
+namespace dataset {
+
+using arrow::fs::internal::MockFileInfo;
+using arrow::fs::internal::MockFileSystem;
+
+struct ExpectedFile {
+  std::string filename;
+  uint64_t start;
+  uint64_t num_rows;
+};
+
+class DatasetWriterTestFixture : public testing::Test {
+ protected:
+  void SetUp() override {
+    fs::TimePoint mock_now = std::chrono::system_clock::now();
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<fs::FileSystem> fs,
+                         MockFileSystem::Make(mock_now, 
{::arrow::fs::Dir("testdir")}));
+    filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs);
+    schema_ = schema({field("int64", int64())});
+    write_options_.filesystem = filesystem_;
+    write_options_.basename_template = "part-{i}.arrow";

Review comment:
       Fixed.

##########
File path: cpp/src/arrow/dataset/dataset_writer_test.cc
##########
@@ -0,0 +1,344 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/dataset/dataset_writer.h"
+
+#include <chrono>
+#include <mutex>
+#include <vector>
+
+#include "arrow/dataset/file_ipc.h"
+#include "arrow/filesystem/mockfs.h"
+#include "arrow/filesystem/test_util.h"
+#include "arrow/io/memory.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/table.h"
+#include "arrow/testing/future_util.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/util/optional.h"
+#include "gtest/gtest.h"
+
+namespace arrow {
+namespace dataset {
+
+using arrow::fs::internal::MockFileInfo;
+using arrow::fs::internal::MockFileSystem;
+
+struct ExpectedFile {
+  std::string filename;
+  uint64_t start;
+  uint64_t num_rows;
+};
+
+class DatasetWriterTestFixture : public testing::Test {
+ protected:
+  void SetUp() override {
+    fs::TimePoint mock_now = std::chrono::system_clock::now();
+    ASSERT_OK_AND_ASSIGN(std::shared_ptr<fs::FileSystem> fs,
+                         MockFileSystem::Make(mock_now, 
{::arrow::fs::Dir("testdir")}));
+    filesystem_ = std::dynamic_pointer_cast<MockFileSystem>(fs);
+    schema_ = schema({field("int64", int64())});
+    write_options_.filesystem = filesystem_;
+    write_options_.basename_template = "part-{i}.arrow";
+    write_options_.base_dir = "testdir";
+    write_options_.writer_pre_finish = [this](FileWriter* writer) {
+      pre_finish_visited_.push_back(writer->destination().path);
+      return Status::OK();
+    };
+    write_options_.writer_post_finish = [this](FileWriter* writer) {
+      post_finish_visited_.push_back(writer->destination().path);
+      return Status::OK();
+    };
+    std::shared_ptr<FileFormat> format = std::make_shared<IpcFileFormat>();
+    write_options_.file_write_options = format->DefaultWriteOptions();
+  }
+
+  std::shared_ptr<fs::GatedMockFilesystem> UseGatedFs() {
+    fs::TimePoint mock_now = std::chrono::system_clock::now();
+    auto fs = std::make_shared<fs::GatedMockFilesystem>(mock_now);
+    ARROW_EXPECT_OK(fs->CreateDir("testdir"));
+    write_options_.filesystem = fs;
+    filesystem_ = fs;
+    return fs;
+  }
+
+  std::shared_ptr<RecordBatch> MakeBatch(uint64_t start, uint64_t num_rows) {
+    Int64Builder builder;
+    for (uint64_t i = 0; i < num_rows; i++) {
+      ARROW_EXPECT_OK(builder.Append(i + start));
+    }
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<Array> arr, builder.Finish());
+    return RecordBatch::Make(schema_, static_cast<int64_t>(num_rows), 
{std::move(arr)});
+  }
+
+  std::shared_ptr<RecordBatch> MakeBatch(uint64_t num_rows) {
+    std::shared_ptr<RecordBatch> batch = MakeBatch(counter_, num_rows);
+    counter_ += num_rows;
+    return batch;
+  }
+
+  util::optional<MockFileInfo> FindFile(const std::string& filename) {
+    for (const auto& mock_file : filesystem_->AllFiles()) {
+      if (mock_file.full_path == filename) {
+        return mock_file;
+      }
+    }
+    return util::nullopt;
+  }
+
+  void AssertVisited(const std::vector<std::string>& actual_paths,
+                     const std::string& expected_path) {
+    std::vector<std::string>::const_iterator found =
+        std::find(actual_paths.begin(), actual_paths.end(), expected_path);
+    ASSERT_NE(found, actual_paths.end())
+        << "The file " << expected_path << " was not in the list of files 
visited";
+  }
+
+  std::shared_ptr<RecordBatch> ReadAsBatch(util::string_view data) {
+    std::shared_ptr<io::RandomAccessFile> in_stream =
+        std::make_shared<io::BufferReader>(data);
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<ipc::RecordBatchFileReader> reader,
+                         
ipc::RecordBatchFileReader::Open(std::move(in_stream)));
+    RecordBatchVector batches;
+    for (int i = 0; i < reader->num_record_batches(); i++) {
+      EXPECT_OK_AND_ASSIGN(std::shared_ptr<RecordBatch> next_batch,
+                           reader->ReadRecordBatch(i));
+      batches.push_back(next_batch);
+    }
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<Table> table, 
Table::FromRecordBatches(batches));
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<Table> combined_table, 
table->CombineChunks());
+    EXPECT_OK_AND_ASSIGN(std::shared_ptr<RecordBatch> batch,
+                         TableBatchReader(*combined_table).Next());
+    return batch;
+  }
+
+  void AssertFileCreated(const util::optional<MockFileInfo>& maybe_file,
+                         const std::string& expected_filename) {
+    ASSERT_TRUE(maybe_file.has_value())
+        << "The file " << expected_filename << " was not created";
+    {
+      SCOPED_TRACE("pre_finish");
+      AssertVisited(pre_finish_visited_, expected_filename);
+    }
+    {
+      SCOPED_TRACE("post_finish");
+      AssertVisited(post_finish_visited_, expected_filename);
+    }
+  }
+
+  void AssertCreatedData(const std::vector<ExpectedFile>& expected_files) {
+    counter_ = 0;
+    for (const auto& expected_file : expected_files) {
+      util::optional<MockFileInfo> written_file = 
FindFile(expected_file.filename);
+      AssertFileCreated(written_file, expected_file.filename);
+      AssertBatchesEqual(*MakeBatch(expected_file.start, 
expected_file.num_rows),
+                         *ReadAsBatch(written_file->data));
+    }
+  }
+
+  void AssertFilesCreated(const std::vector<std::string>& expected_files) {
+    for (const std::string& expected_file : expected_files) {
+      util::optional<MockFileInfo> written_file = FindFile(expected_file);
+      AssertFileCreated(written_file, expected_file);
+    }
+  }
+
+  void AssertNotFiles(const std::vector<std::string>& expected_non_files) {
+    for (const auto& expected_non_file : expected_non_files) {
+      util::optional<MockFileInfo> file = FindFile(expected_non_file);
+      ASSERT_FALSE(file.has_value());
+    }
+  }
+
+  void AssertEmptyFiles(const std::vector<std::string>& expected_empty_files) {
+    for (const auto& expected_empty_file : expected_empty_files) {
+      util::optional<MockFileInfo> file = FindFile(expected_empty_file);
+      ASSERT_TRUE(file.has_value());
+      ASSERT_EQ("", file->data);
+    }
+  }
+
+  std::shared_ptr<MockFileSystem> filesystem_;
+  std::shared_ptr<Schema> schema_;
+  std::vector<std::string> pre_finish_visited_;
+  std::vector<std::string> post_finish_visited_;
+  FileSystemDatasetWriteOptions write_options_;
+  uint64_t counter_ = 0;
+};
+
+TEST_F(DatasetWriterTestFixture, Basic) {
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(100), "");
+  AssertFinished(queue_fut);
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertCreatedData({{"testdir/part-0.arrow", 0, 100}});
+}
+
+TEST_F(DatasetWriterTestFixture, MaxRowsOneWrite) {
+  write_options_.max_rows_per_file = 10;
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(35), "");
+  AssertFinished(queue_fut);
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertCreatedData({{"testdir/part-0.arrow", 0, 10},
+                     {"testdir/part-1.arrow", 10, 10},
+                     {"testdir/part-2.arrow", 20, 10},
+                     {"testdir/part-3.arrow", 30, 5}});
+}
+
+TEST_F(DatasetWriterTestFixture, MaxRowsManyWrites) {
+  write_options_.max_rows_per_file = 10;
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(3), ""));
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertCreatedData({{"testdir/part-0.arrow", 0, 10}, {"testdir/part-1.arrow", 
10, 8}});
+}
+
+TEST_F(DatasetWriterTestFixture, ConcurrentWritesSameFile) {
+  auto gated_fs = UseGatedFs();
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  for (int i = 0; i < 10; i++) {
+    Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(10), "");
+    AssertFinished(queue_fut);
+    ASSERT_FINISHES_OK(queue_fut);
+  }
+  ASSERT_OK(gated_fs->WaitForOpenOutputStream(1));
+  ASSERT_OK(gated_fs->UnlockOpenOutputStream(1));
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertCreatedData({{"testdir/part-0.arrow", 0, 100}});
+}
+
+TEST_F(DatasetWriterTestFixture, ConcurrentWritesDifferentFiles) {
+  // NBATCHES must be less than I/O executor concurrency to avoid deadlock / 
test failure
+  constexpr int NBATCHES = 6;
+  auto gated_fs = UseGatedFs();
+  std::vector<ExpectedFile> expected_files;
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+  for (int i = 0; i < NBATCHES; i++) {
+    std::string i_str = std::to_string(i);
+    expected_files.push_back(ExpectedFile{"testdir/part" + i_str + 
"/part-0.arrow",
+                                          static_cast<uint64_t>(i) * 10, 10});
+    Future<> queue_fut = dataset_writer->WriteRecordBatch(MakeBatch(10), 
"part" + i_str);
+    AssertFinished(queue_fut);
+    ASSERT_FINISHES_OK(queue_fut);
+  }
+  ASSERT_OK(gated_fs->WaitForOpenOutputStream(NBATCHES));
+  ASSERT_OK(gated_fs->UnlockOpenOutputStream(NBATCHES));
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertCreatedData(expected_files);
+}
+
+TEST_F(DatasetWriterTestFixture, MaxOpenFiles) {
+  auto gated_fs = UseGatedFs();
+  write_options_.max_open_files = 2;
+  EXPECT_OK_AND_ASSIGN(auto dataset_writer, 
DatasetWriter::Make(write_options_));
+
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part0"));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part1"));
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part0"));
+  Future<> fut = dataset_writer->WriteRecordBatch(MakeBatch(10), "part2");
+  // Backpressure will be applied until an existing file can be evicted
+  AssertNotFinished(fut);
+
+  // Ungate the writes to relieve the pressure, testdir/part0 should be closed
+  ASSERT_OK(gated_fs->WaitForOpenOutputStream(2));
+  ASSERT_OK(gated_fs->UnlockOpenOutputStream(5));
+  ASSERT_FINISHES_OK(fut);
+
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part0"));
+  // Following call should resume existing write but, on slow test systems, 
the old
+  // write may have already been finished
+  ASSERT_FINISHES_OK(dataset_writer->WriteRecordBatch(MakeBatch(10), "part1"));
+  ASSERT_FINISHES_OK(dataset_writer->Finish());
+  AssertFilesCreated({"testdir/part0/part-0.arrow", 
"testdir/part0/part-1.arrow",
+                      "testdir/part1/part-0.arrow", 
"testdir/part2/part-0.arrow"});
+}
+
+TEST_F(DatasetWriterTestFixture, DeleteExistingData) {
+  fs::TimePoint mock_now = std::chrono::system_clock::now();
+  ASSERT_OK_AND_ASSIGN(std::shared_ptr<fs::FileSystem> fs,
+                       MockFileSystem::Make(mock_now, 
{::arrow::fs::Dir("testdir"),
+                                                       
fs::File("testdir/part-5.arrow"),
+                                                       
fs::File("testdir/blah.txt")}));

Review comment:
       I added the case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to