This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 050ccee9ff GH-38011: [C++][Dataset] Change force close to tend to 
close on write (#38030)
050ccee9ff is described below

commit 050ccee9ff4e967a9e604f8e4829a2b400688ee1
Author: mwish <[email protected]>
AuthorDate: Fri Oct 6 04:19:09 2023 +0800

    GH-38011: [C++][Dataset] Change force close to tend to close on write 
(#38030)
    
    
    
    ### Rationale for this change
    
    `CloseLargestFile()` will failed to close when non of the file has written 
any rows.
    
    ### What changes are included in this PR?
    
    Change `CloseLargestFile()` to `TryCloseLargestFile()`, and not throw error 
when it cannot find a file that haven't write any rows.
    
    ### Are these changes tested?
    
    no
    
    ### Are there any user-facing changes?
    
    bugfix
    
    * Closes: #38011
    
    Authored-by: mwish <[email protected]>
    Signed-off-by: Sutou Kouhei <[email protected]>
---
 cpp/src/arrow/dataset/dataset_writer.cc | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/cpp/src/arrow/dataset/dataset_writer.cc 
b/cpp/src/arrow/dataset/dataset_writer.cc
index 686c7712fe..a2096d691b 100644
--- a/cpp/src/arrow/dataset/dataset_writer.cc
+++ b/cpp/src/arrow/dataset/dataset_writer.cc
@@ -139,7 +139,7 @@ class DatasetWriterFileQueue {
       : options_(options), schema_(schema), writer_state_(writer_state) {}
 
   void Start(util::AsyncTaskScheduler* file_tasks, const std::string& 
filename) {
-    file_tasks_ = std::move(file_tasks);
+    file_tasks_ = file_tasks;
     // Because the scheduler runs one task at a time we know the writer will
     // be opened before any attempt to write
     file_tasks_->AddSimpleTask(
@@ -575,7 +575,7 @@ class DatasetWriter::DatasetWriterImpl {
   }
 
  protected:
-  Status CloseLargestFile() {
+  Status TryCloseLargestFile() {
     std::shared_ptr<DatasetWriterDirectoryQueue> largest = nullptr;
     uint64_t largest_num_rows = 0;
     for (auto& dir_queue : directory_queues_) {
@@ -584,7 +584,10 @@ class DatasetWriter::DatasetWriterImpl {
         largest = dir_queue.second;
       }
     }
-    DCHECK_NE(largest, nullptr);
+    if (largest == nullptr) {
+      // GH-38011: If all written files has written 0 rows, we should not 
close any file
+      return Status::OK();
+    }
     return largest->FinishCurrentFile();
   }
 
@@ -618,7 +621,7 @@ class DatasetWriter::DatasetWriterImpl {
         backpressure = writer_state_.open_files_throttle.Acquire(1);
         if (!backpressure.is_finished()) {
           
EVENT_ON_CURRENT_SPAN("DatasetWriter::Backpressure::TooManyOpenFiles");
-          RETURN_NOT_OK(CloseLargestFile());
+          RETURN_NOT_OK(TryCloseLargestFile());
           break;
         }
       }

Reply via email to