This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new ec77fab2f5 PARQUET-2316: [C++] Allow partial PreBuffer in the parquet 
FileReader (#36192)
ec77fab2f5 is described below

commit ec77fab2f5cdf51acfac25178b18411d18d8045c
Author: Jinpeng <[email protected]>
AuthorDate: Thu Jul 6 10:19:36 2023 -0400

    PARQUET-2316: [C++] Allow partial PreBuffer in the parquet FileReader 
(#36192)
    
    ### Rationale for this change
    
    The current FileReader can only work inĀ  one of the two modes, coalescing 
(when Prebuffer is called) and non-coalescing (when Prefufer is not called), 
due to the if statement 
[here](https://github.com/apache/arrow/blob/main/cpp/src/parquet/file_reader.cc#L203)
    
    Since Prebuffer is basically caching all specified column chunks, it would 
raise concerns on memory usage for systems with tight memory budget. In such 
scenarios, one may want to Prebuffer some small chunks while being able to read 
the rest chunks usingĀ  BufferedInputStream.
    
    ### What changes are included in this PR?
    
    Changes to support partial prebuffer on a subset of column chunks and a 
unit test
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    No.
    
    Authored-by: jp0317 <[email protected]>
    Signed-off-by: Gang Wu <[email protected]>
---
 cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 32 +++++++++++++++++++++++
 cpp/src/parquet/file_reader.cc                    | 22 ++++++++++++++--
 2 files changed, 52 insertions(+), 2 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc 
b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index f82177b86f..69827d5c46 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -2394,6 +2394,38 @@ TEST(TestArrowReadWrite, WaitCoalescedReads) {
   ASSERT_EQ(actual_batch->num_rows(), num_rows);
 }
 
+// Use coalesced reads and non-coaleasced reads for different column chunks.
+TEST(TestArrowReadWrite, CoalescedReadsAndNonCoalescedReads) {
+  constexpr int num_columns = 5;
+  constexpr int num_rows = 128;
+
+  std::shared_ptr<Table> expected;
+  ASSERT_NO_FATAL_FAILURE(
+      MakeDoubleTable(num_columns, num_rows, /*nchunks=*/1, &expected));
+
+  std::shared_ptr<Buffer> buffer;
+  ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(expected, num_rows / 2,
+                                             
default_arrow_writer_properties(), &buffer));
+
+  std::unique_ptr<FileReader> reader;
+  ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
+                              ::arrow::default_memory_pool(), &reader));
+
+  ASSERT_EQ(2, reader->num_row_groups());
+
+  // Pre-buffer 3 columns in the 2nd row group.
+  const std::vector<int> row_groups = {1};
+  const std::vector<int> column_indices = {0, 1, 4};
+  reader->parquet_reader()->PreBuffer(row_groups, column_indices,
+                                      ::arrow::io::IOContext(),
+                                      ::arrow::io::CacheOptions::Defaults());
+  ASSERT_OK(reader->parquet_reader()->WhenBuffered(row_groups, 
column_indices).status());
+
+  ASSERT_OK_AND_ASSIGN(auto actual, ReadTableManually(reader.get()));
+
+  AssertTablesEqual(*actual, *expected, /*same_chunk_layout=*/false);
+}
+
 TEST(TestArrowReadWrite, GetRecordBatchReaderNoColumns) {
   ArrowReaderProperties properties = default_arrow_reader_properties();
   const int num_rows = 10;
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index 488c3b9592..fc30ddb43f 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -23,6 +23,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <unordered_map>
 #include <utility>
 
 #include "arrow/io/caching.h"
@@ -178,6 +179,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
                      std::shared_ptr<::arrow::io::internal::ReadRangeCache> 
cached_source,
                      int64_t source_size, FileMetaData* file_metadata,
                      int row_group_number, const ReaderProperties& props,
+                     std::unordered_set<int> prebuffered_column_chunks,
                      std::shared_ptr<InternalFileDecryptor> file_decryptor = 
nullptr)
       : source_(std::move(source)),
         cached_source_(std::move(cached_source)),
@@ -185,6 +187,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
         file_metadata_(file_metadata),
         properties_(props),
         row_group_ordinal_(row_group_number),
+        prebuffered_column_chunks_(std::move(prebuffered_column_chunks)),
         file_decryptor_(file_decryptor) {
     row_group_metadata_ = file_metadata->RowGroup(row_group_number);
   }
@@ -200,7 +203,8 @@ class SerializedRowGroup : public RowGroupReader::Contents {
     ::arrow::io::ReadRange col_range =
         ComputeColumnChunkRange(file_metadata_, source_size_, 
row_group_ordinal_, i);
     std::shared_ptr<ArrowInputStream> stream;
-    if (cached_source_) {
+    if (cached_source_ &&
+        prebuffered_column_chunks_.find(i) != 
prebuffered_column_chunks_.end()) {
       // PARQUET-1698: if read coalescing is enabled, read from pre-buffered
       // segments.
       PARQUET_ASSIGN_OR_THROW(auto buffer, cached_source_->Read(col_range));
@@ -268,6 +272,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
   std::unique_ptr<RowGroupMetaData> row_group_metadata_;
   ReaderProperties properties_;
   int row_group_ordinal_;
+  const std::unordered_set<int> prebuffered_column_chunks_;
   std::shared_ptr<InternalFileDecryptor> file_decryptor_;
 };
 
@@ -297,9 +302,17 @@ class SerializedFile : public ParquetFileReader::Contents {
   }
 
   std::shared_ptr<RowGroupReader> GetRowGroup(int i) override {
+    std::unordered_set<int> prebuffered_column_chunks;
+    // Avoid updating the map as this function can be called concurrently. The 
map can
+    // only be updated within Prebuffer().
+    auto prebuffered_column_chunks_iter = prebuffered_column_chunks_.find(i);
+    if (prebuffered_column_chunks_iter != prebuffered_column_chunks_.end()) {
+      prebuffered_column_chunks = prebuffered_column_chunks_iter->second;
+    }
+
     std::unique_ptr<SerializedRowGroup> contents = 
std::make_unique<SerializedRowGroup>(
         source_, cached_source_, source_size_, file_metadata_.get(), i, 
properties_,
-        file_decryptor_);
+        std::move(prebuffered_column_chunks), file_decryptor_);
     return std::make_shared<RowGroupReader>(std::move(contents));
   }
 
@@ -351,8 +364,11 @@ class SerializedFile : public ParquetFileReader::Contents {
     cached_source_ =
         std::make_shared<::arrow::io::internal::ReadRangeCache>(source_, ctx, 
options);
     std::vector<::arrow::io::ReadRange> ranges;
+    prebuffered_column_chunks_.clear();
     for (int row : row_groups) {
+      std::unordered_set<int>& prebuffered = prebuffered_column_chunks_[row];
       for (int col : column_indices) {
+        prebuffered.insert(col);
         ranges.push_back(
             ComputeColumnChunkRange(file_metadata_.get(), source_size_, row, 
col));
       }
@@ -562,6 +578,8 @@ class SerializedFile : public ParquetFileReader::Contents {
   ReaderProperties properties_;
   std::shared_ptr<PageIndexReader> page_index_reader_;
   std::unique_ptr<BloomFilterReader> bloom_filter_reader_;
+  // Maps a row group to its column chunks that are cached via Prebuffer().
+  std::unordered_map<int, std::unordered_set<int>> prebuffered_column_chunks_;
   std::shared_ptr<InternalFileDecryptor> file_decryptor_;
 
   // \return The true length of the metadata in bytes

Reply via email to