This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 050ccee9ff GH-38011: [C++][Dataset] Change force close to tend to
close on write (#38030)
050ccee9ff is described below
commit 050ccee9ff4e967a9e604f8e4829a2b400688ee1
Author: mwish <[email protected]>
AuthorDate: Fri Oct 6 04:19:09 2023 +0800
GH-38011: [C++][Dataset] Change force close to tend to close on write
(#38030)
### Rationale for this change
`CloseLargestFile()` will failed to close when non of the file has written
any rows.
### What changes are included in this PR?
Change `CloseLargestFile()` to `TryCloseLargestFile()`, and not throw error
when it cannot find a file that haven't write any rows.
### Are these changes tested?
no
### Are there any user-facing changes?
bugfix
* Closes: #38011
Authored-by: mwish <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
cpp/src/arrow/dataset/dataset_writer.cc | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git a/cpp/src/arrow/dataset/dataset_writer.cc
b/cpp/src/arrow/dataset/dataset_writer.cc
index 686c7712fe..a2096d691b 100644
--- a/cpp/src/arrow/dataset/dataset_writer.cc
+++ b/cpp/src/arrow/dataset/dataset_writer.cc
@@ -139,7 +139,7 @@ class DatasetWriterFileQueue {
: options_(options), schema_(schema), writer_state_(writer_state) {}
void Start(util::AsyncTaskScheduler* file_tasks, const std::string&
filename) {
- file_tasks_ = std::move(file_tasks);
+ file_tasks_ = file_tasks;
// Because the scheduler runs one task at a time we know the writer will
// be opened before any attempt to write
file_tasks_->AddSimpleTask(
@@ -575,7 +575,7 @@ class DatasetWriter::DatasetWriterImpl {
}
protected:
- Status CloseLargestFile() {
+ Status TryCloseLargestFile() {
std::shared_ptr<DatasetWriterDirectoryQueue> largest = nullptr;
uint64_t largest_num_rows = 0;
for (auto& dir_queue : directory_queues_) {
@@ -584,7 +584,10 @@ class DatasetWriter::DatasetWriterImpl {
largest = dir_queue.second;
}
}
- DCHECK_NE(largest, nullptr);
+ if (largest == nullptr) {
+ // GH-38011: If all written files has written 0 rows, we should not
close any file
+ return Status::OK();
+ }
return largest->FinishCurrentFile();
}
@@ -618,7 +621,7 @@ class DatasetWriter::DatasetWriterImpl {
backpressure = writer_state_.open_files_throttle.Acquire(1);
if (!backpressure.is_finished()) {
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles");
- RETURN_NOT_OK(CloseLargestFile());
+ RETURN_NOT_OK(TryCloseLargestFile());
break;
}
}