bkietz commented on a change in pull request #8305:
URL: https://github.com/apache/arrow/pull/8305#discussion_r499628240
##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -143,97 +145,205 @@ FragmentIterator FileSystemDataset::GetFragmentsImpl(
return MakeVectorIterator(std::move(fragments));
}
-struct WriteTask {
- Status Execute();
+Status FileWriter::Write(RecordBatchReader* batches) {
+ for (std::shared_ptr<RecordBatch> batch;;) {
+ RETURN_NOT_OK(batches->ReadNext(&batch));
+ if (batch == nullptr) break;
+ RETURN_NOT_OK(Write(batch));
+ }
+ return Status::OK();
+}
+
+struct NextBasenameGenerator {
+ static Result<NextBasenameGenerator> Make(const std::string&
basename_template) {
+ if (basename_template.find(fs::internal::kSep) != std::string::npos) {
+ return Status::Invalid("basename_template contained '/'");
+ }
+ size_t token_start = basename_template.find(token());
+ if (token_start == std::string::npos) {
+ return Status::Invalid("basename_template did not contain '{i}'");
+ }
+ return NextBasenameGenerator{basename_template, 0, token_start,
+ token_start + token().size()};
+ }
- /// The basename of files written by this WriteTask. Extensions
- /// are derived from format
- std::string basename;
+ static const std::string& token() {
+ static const std::string token = "{i}";
+ return token;
+ }
- /// The partitioning with which paths will be generated
- std::shared_ptr<Partitioning> partitioning;
+ const std::string& template_;
+ size_t i_, token_start_, token_end_;
- /// The format in which fragments will be written
- std::shared_ptr<FileFormat> format;
+ std::string operator()() {
+ return template_.substr(0, token_start_) + std::to_string(i_++) +
+ template_.substr(token_end_);
+ }
+};
- /// The FileSystem and base directory into which fragments will be written
- std::shared_ptr<fs::FileSystem> filesystem;
- std::string base_dir;
+using MutexedWriter = util::Mutexed<std::shared_ptr<FileWriter>>;
- /// Batches to be written
- std::shared_ptr<RecordBatchReader> batches;
+struct WriterSet {
+ WriterSet(NextBasenameGenerator next_basename,
+ const FileSystemDatasetWriteOptions& write_options)
+ : next_basename_(std::move(next_basename)),
+ base_dir_(fs::internal::EnsureTrailingSlash(write_options.base_dir)),
+ write_options_(write_options) {}
- /// An Expression already satisfied by every batch to be written
- std::shared_ptr<Expression> partition_expression;
-};
+ Result<std::shared_ptr<MutexedWriter>> Get(const Expression&
partition_expression,
+ const std::shared_ptr<Schema>&
schema) {
+ ARROW_ASSIGN_OR_RAISE(auto part_segments,
+
write_options_.partitioning->Format(partition_expression));
+ std::string dir = base_dir_ + part_segments;
-Status WriteTask::Execute() {
- std::unordered_map<std::string, RecordBatchVector> path_to_batches;
-
- // TODO(bkietz) these calls to Partition() should be scattered across a
TaskGroup
- for (auto maybe_batch : IteratorFromReader(batches)) {
- ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
- ARROW_ASSIGN_OR_RAISE(auto partitioned_batches,
partitioning->Partition(batch));
- for (auto&& partitioned_batch : partitioned_batches) {
- AndExpression expr(std::move(partitioned_batch.partition_expression),
- partition_expression);
- ARROW_ASSIGN_OR_RAISE(std::string path, partitioning->Format(expr));
- path = fs::internal::EnsureLeadingSlash(path);
- path_to_batches[path].push_back(std::move(partitioned_batch.batch));
- }
- }
+ util::Mutex::Guard writer_lock;
+
+ auto set_lock = mutex_.Lock();
- for (auto&& path_batches : path_to_batches) {
- auto dir = base_dir + path_batches.first;
- RETURN_NOT_OK(filesystem->CreateDir(dir, /*recursive=*/true));
+ auto writer =
+ internal::GetOrInsertGenerated(&dir_to_writer_, dir, [&](const
std::string&) {
+ auto writer = std::make_shared<MutexedWriter>();
+ writer_lock = writer->Lock();
+ return writer;
+ })->second;
- auto path = fs::internal::ConcatAbstractPath(dir, basename);
- ARROW_ASSIGN_OR_RAISE(auto destination,
filesystem->OpenOutputStream(path));
+ if (writer_lock) {
+ // NB: next_basename_() must be invoked with the set_lock held
+ auto path = fs::internal::ConcatAbstractPath(dir, next_basename_());
+ set_lock.Unlock();
- DCHECK(!path_batches.second.empty());
- ARROW_ASSIGN_OR_RAISE(auto reader,
-
RecordBatchReader::Make(std::move(path_batches.second)));
- RETURN_NOT_OK(format->WriteFragment(reader.get(), destination.get()));
+ RETURN_NOT_OK(write_options_.filesystem->CreateDir(dir));
+
+ ARROW_ASSIGN_OR_RAISE(auto destination,
+ write_options_.filesystem->OpenOutputStream(path));
+
+ ARROW_ASSIGN_OR_RAISE(**writer, write_options_.format()->MakeWriter(
+ std::move(destination), schema,
+ write_options_.file_write_options));
+ }
+
+ return writer;
}
- return Status::OK();
-}
+ Status FinishAll(internal::TaskGroup* task_group) {
+ for (const auto& dir_writer : dir_to_writer_) {
+ task_group->Append([&] {
+ std::shared_ptr<FileWriter> writer = **dir_writer.second;
+ return writer->Finish();
+ });
+ }
-Status FileSystemDataset::Write(std::shared_ptr<Schema> schema,
- std::shared_ptr<FileFormat> format,
- std::shared_ptr<fs::FileSystem> filesystem,
- std::string base_dir,
- std::shared_ptr<Partitioning> partitioning,
- std::shared_ptr<ScanContext> scan_context,
- FragmentIterator fragment_it) {
- auto task_group = scan_context->TaskGroup();
+ return Status::OK();
+ }
+
+ // There should only be a single writer open for each partition directory at
a time
+ util::Mutex mutex_;
+ std::unordered_map<std::string, std::shared_ptr<MutexedWriter>>
dir_to_writer_;
+ NextBasenameGenerator next_basename_;
+ std::string base_dir_;
+ const FileSystemDatasetWriteOptions& write_options_;
+};
- base_dir = std::string(fs::internal::RemoveTrailingSlash(base_dir));
+Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions&
write_options,
+ std::shared_ptr<Scanner> scanner) {
+ auto task_group = scanner->context()->TaskGroup();
- for (const auto& f : partitioning->schema()->fields()) {
+ for (const auto& f : write_options.partitioning->schema()->fields()) {
if (f->type()->id() == Type::DICTIONARY) {
return Status::NotImplemented("writing with dictionary partitions");
}
}
- int i = 0;
- for (auto maybe_fragment : fragment_it) {
- ARROW_ASSIGN_OR_RAISE(auto fragment, maybe_fragment);
- auto task = std::make_shared<WriteTask>();
-
- task->basename = "dat_" + std::to_string(i++) + "." + format->type_name();
- task->partition_expression = fragment->partition_expression();
- task->format = format;
- task->filesystem = filesystem;
- task->base_dir = base_dir;
- task->partitioning = partitioning;
-
- // make a record batch reader which yields from a fragment
- ARROW_ASSIGN_OR_RAISE(task->batches, FragmentRecordBatchReader::Make(
- std::move(fragment), schema,
scan_context));
- task_group->Append([task] { return task->Execute(); });
+ // Things we'll un-lazy for the sake of simplicity, with the tradeoff they
represent:
+ //
+ // - Fragment iteration. Keeping this lazy would allow us to start
partitioning/writing
+ // any fragments we have before waiting for discovery to complete. This
isn't
+ // currently implemented for FileSystemDataset anyway: ARROW-8613
+ //
+ // - ScanTask iteration. Keeping this lazy would save some unnecessary
blocking when
+ // writing Fragments which produce scan tasks slowly. No Fragments do this.
+ //
+ // NB: neither of these will have any impact whatsoever on the common case
of writing
+ // an in-memory table to disk.
+ ARROW_ASSIGN_OR_RAISE(FragmentVector fragments,
scanner->GetFragments().ToVector());
+ ScanTaskVector scan_tasks;
+ std::vector<const Fragment*> fragment_for_task;
+
+ // Avoid contention with multithreaded readers
+ auto context = std::make_shared<ScanContext>(*scanner->context());
+ context->use_threads = false;
+
+ for (const auto& fragment : fragments) {
Review comment:
We *could* do two scans of the input data:
1. Assemble a list of all unique values in the partition columns of the
data, from which we can determine the precise set of writers to open
2. Apply groupings to batches, passing the results to pre-opened writers
This doesn't seem worthwhile to me; scanning the input is potentially
expensive so we should avoid doing it twice. Furthermore we'll still need to
coordinate between threads since two input batches might still contain rows
bound for a single output writer.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]