bkietz commented on a change in pull request #9482:
URL: https://github.com/apache/arrow/pull/9482#discussion_r578639213
##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -54,12 +55,20 @@ class ParquetScanTask : public ScanTask {
public:
ParquetScanTask(int row_group, std::vector<int> column_projection,
std::shared_ptr<parquet::arrow::FileReader> reader,
+ std::shared_ptr<std::once_flag> pre_buffer_once,
+ std::vector<int> pre_buffer_row_groups,
+ arrow::io::AsyncContext async_context,
+ arrow::io::CacheOptions cache_options,
std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context)
: ScanTask(std::move(options), std::move(context)),
row_group_(row_group),
column_projection_(std::move(column_projection)),
- reader_(std::move(reader)) {}
+ reader_(std::move(reader)),
+ pre_buffer_once_(std::move(pre_buffer_once)),
+ pre_buffer_row_groups_(std::move(pre_buffer_row_groups_)),
Review comment:
Critical typo :D :
```suggestion
pre_buffer_row_groups_(std::move(pre_buffer_row_groups)),
```
##########
File path: cpp/src/arrow/dataset/file_parquet.cc
##########
@@ -79,16 +88,42 @@ class ParquetScanTask : public ScanTask {
std::unique_ptr<RecordBatchReader> record_batch_reader;
} NextBatch;
+ RETURN_NOT_OK(EnsurePreBuffered());
NextBatch.file_reader = reader_;
RETURN_NOT_OK(reader_->GetRecordBatchReader({row_group_},
column_projection_,
&NextBatch.record_batch_reader));
return MakeFunctionIterator(std::move(NextBatch));
}
+ // Ensure that pre-buffering has been applied to the underlying Parquet
reader
+ // exactly once (if needed). If we instead set pre_buffer on in the Arrow
+ // reader properties, each scan task will try to separately pre-buffer, which
+ // will lead to crashes as they trample the Parquet file reader's internal
+ // state. Instead, pre-buffer once at the file level. This also has the
+ // advantage that we can coalesce reads across row groups.
+ Status EnsurePreBuffered() {
+ if (pre_buffer_once_) {
+ BEGIN_PARQUET_CATCH_EXCEPTIONS
+ std::call_once(*pre_buffer_once_, [this]() {
+ reader_->parquet_reader()->PreBuffer(pre_buffer_row_groups_,
column_projection_,
+ async_context_, cache_options_);
+ });
+ END_PARQUET_CATCH_EXCEPTIONS
+ }
+ return Status::OK();
+ }
+
private:
int row_group_;
std::vector<int> column_projection_;
std::shared_ptr<parquet::arrow::FileReader> reader_;
+ // Pre-buffering state. pre_buffer_once will be nullptr if no
+ // pre-buffering is to be done, and row_groups will be empty in that
+ // case. We assume all scan tasks have the same column projection.
Review comment:
Not really a problem, butyou still set row_groups even if when not
pre-buffering
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]