bkietz commented on a change in pull request #8305:
URL: https://github.com/apache/arrow/pull/8305#discussion_r498813432



##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -143,97 +145,205 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl(
   return MakeVectorIterator(std::move(fragments));
 }
 
-struct WriteTask {
-  Status Execute();
+Status FileWriter::Write(RecordBatchReader* batches) {
+  for (std::shared_ptr<RecordBatch> batch;;) {

Review comment:
       I personally prefer `for(;;)` to `while(true)` but if it'll be clearer 
then I'll rewrite it

##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -143,97 +145,205 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl(
   return MakeVectorIterator(std::move(fragments));
 }
 
-struct WriteTask {
-  Status Execute();
+Status FileWriter::Write(RecordBatchReader* batches) {
+  for (std::shared_ptr<RecordBatch> batch;;) {
+    RETURN_NOT_OK(batches->ReadNext(&batch));
+    if (batch == nullptr) break;
+    RETURN_NOT_OK(Write(batch));
+  }
+  return Status::OK();
+}
+
+struct NextBasenameGenerator {
+  static Result<NextBasenameGenerator> Make(const std::string& 
basename_template) {
+    if (basename_template.find(fs::internal::kSep) != std::string::npos) {
+      return Status::Invalid("basename_template contained '/'");
+    }
+    size_t token_start = basename_template.find(token());
+    if (token_start == std::string::npos) {
+      return Status::Invalid("basename_template did not contain '{i}'");
+    }
+    return NextBasenameGenerator{basename_template, 0, token_start,
+                                 token_start + token().size()};
+  }
 
-  /// The basename of files written by this WriteTask. Extensions
-  /// are derived from format
-  std::string basename;
+  static const std::string& token() {
+    static const std::string token = "{i}";
+    return token;
+  }
 
-  /// The partitioning with which paths will be generated
-  std::shared_ptr<Partitioning> partitioning;
+  const std::string& template_;
+  size_t i_, token_start_, token_end_;
 
-  /// The format in which fragments will be written
-  std::shared_ptr<FileFormat> format;
+  std::string operator()() {
+    return template_.substr(0, token_start_) + std::to_string(i_++) +
+           template_.substr(token_end_);
+  }
+};
 
-  /// The FileSystem and base directory into which fragments will be written
-  std::shared_ptr<fs::FileSystem> filesystem;
-  std::string base_dir;
+using MutexedWriter = util::Mutexed<std::shared_ptr<FileWriter>>;
 
-  /// Batches to be written
-  std::shared_ptr<RecordBatchReader> batches;
+struct WriterSet {
+  WriterSet(NextBasenameGenerator next_basename,
+            const FileSystemDatasetWriteOptions& write_options)
+      : next_basename_(std::move(next_basename)),
+        base_dir_(fs::internal::EnsureTrailingSlash(write_options.base_dir)),
+        write_options_(write_options) {}
 
-  /// An Expression already satisfied by every batch to be written
-  std::shared_ptr<Expression> partition_expression;
-};
+  Result<std::shared_ptr<MutexedWriter>> Get(const Expression& 
partition_expression,
+                                             const std::shared_ptr<Schema>& 
schema) {
+    ARROW_ASSIGN_OR_RAISE(auto part_segments,
+                          
write_options_.partitioning->Format(partition_expression));
+    std::string dir = base_dir_ + part_segments;
 
-Status WriteTask::Execute() {
-  std::unordered_map<std::string, RecordBatchVector> path_to_batches;
-
-  // TODO(bkietz) these calls to Partition() should be scattered across a 
TaskGroup
-  for (auto maybe_batch : IteratorFromReader(batches)) {
-    ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
-    ARROW_ASSIGN_OR_RAISE(auto partitioned_batches, 
partitioning->Partition(batch));
-    for (auto&& partitioned_batch : partitioned_batches) {
-      AndExpression expr(std::move(partitioned_batch.partition_expression),
-                         partition_expression);
-      ARROW_ASSIGN_OR_RAISE(std::string path, partitioning->Format(expr));
-      path = fs::internal::EnsureLeadingSlash(path);
-      path_to_batches[path].push_back(std::move(partitioned_batch.batch));
-    }
-  }
+    util::Mutex::Guard writer_lock;
+
+    auto set_lock = mutex_.Lock();
 
-  for (auto&& path_batches : path_to_batches) {
-    auto dir = base_dir + path_batches.first;
-    RETURN_NOT_OK(filesystem->CreateDir(dir, /*recursive=*/true));
+    auto writer =

Review comment:
       I'll add comments inline




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to