bkietz commented on a change in pull request #9482:
URL: https://github.com/apache/arrow/pull/9482#discussion_r578587245



##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -320,9 +325,44 @@ Result<ScanTaskIterator> 
ParquetFileFormat::ScanFile(std::shared_ptr<ScanOptions
   auto column_projection = InferColumnProjection(*reader, *options);
   ScanTaskVector tasks(row_groups.size());
 
+  // Manually pre-buffer. If we set pre_buffer on in the Arrow reader 
properties, each
+  // record batch reader we create will try to separately pre-buffer, which 
will lead to
+  // crashes as they all separately trample the Parquet file reader's internal 
state.
+  // Instead, pre-buffer once at the top level. This also has the advantage 
that we can
+  // coalesce reads across row groups.
+  struct {
+    arrow::Status operator()() {
+      if (once) {
+        BEGIN_PARQUET_CATCH_EXCEPTIONS
+        std::call_once(*once, [this]() {
+          reader->parquet_reader()->PreBuffer(row_groups, column_projection,
+                                              async_context, cache_options);
+        });
+        END_PARQUET_CATCH_EXCEPTIONS
+      }
+      return Status::OK();
+    }
+
+    std::shared_ptr<parquet::arrow::FileReader> reader;
+    std::shared_ptr<std::once_flag> once;
+    std::vector<int> row_groups;
+    std::vector<int> column_projection;
+    arrow::io::AsyncContext async_context;
+    arrow::io::CacheOptions cache_options;
+  } PreBufferTask;

Review comment:
       I think this would be a bit clearer inlined as members of 
ParquetScanTask:
   ```c++
       Status EnsurePreBuffered() {
         if (pre_buffer_once) {
           BEGIN_PARQUET_CATCH_EXCEPTIONS
           std::call_once(*pre_buffer_once, [this]() {
             reader->parquet_reader()->PreBuffer(row_groups, column_projection,
                                                 async_context, cache_options);
           });
           END_PARQUET_CATCH_EXCEPTIONS
         }
         return Status::OK();
       }
   
       std::shared_ptr<std::once_flag> pre_buffer_once;
       std::vector<int> row_groups;
       arrow::io::AsyncContext async_context;
       arrow::io::CacheOptions cache_options;
   };
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to