emkornfield commented on code in PR #14964:
URL: https://github.com/apache/arrow/pull/14964#discussion_r1083083858


##########
cpp/src/parquet/page_index.cc:
##########
@@ -184,8 +185,241 @@ class OffsetIndexImpl : public OffsetIndex {
   std::vector<PageLocation> page_locations_;
 };
 
+class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader {
+ public:
+  RowGroupPageIndexReaderImpl(::arrow::io::RandomAccessFile* input,
+                              std::shared_ptr<RowGroupMetaData> 
row_group_metadata,
+                              const ReaderProperties& properties,
+                              int32_t row_group_ordinal,
+                              std::shared_ptr<InternalFileDecryptor> 
file_decryptor)
+      : input_(input),
+        row_group_metadata_(row_group_metadata),
+        properties_(properties),
+        file_decryptor_(std::move(file_decryptor)) {
+    bool has_column_index = false;
+    bool has_offset_index = false;
+    PageIndexReader::DeterminePageIndexRangesInRowGroup(
+        *row_group_metadata, &column_index_base_offset_, &column_index_size_,
+        &offset_index_base_offset_, &offset_index_size_, &has_column_index,
+        &has_offset_index);
+  }
+
+  /// Read column index of a column chunk.
+  ::arrow::Result<std::shared_ptr<ColumnIndex>> GetColumnIndex(int32_t i) 
override {
+    if (i < 0 || i >= row_group_metadata_->num_columns()) {
+      return ::arrow::Status::IndexError("Invalid column {} to get column 
index", i);
+    }
+
+    auto col_chunk = row_group_metadata_->ColumnChunk(i);
+
+    std::unique_ptr<ColumnCryptoMetaData> crypto_metadata = 
col_chunk->crypto_metadata();
+    if (crypto_metadata != nullptr && file_decryptor_ == nullptr) {
+      ParquetException::NYI("Cannot read encrypted column index yet");
+    }
+
+    auto column_index_location = col_chunk->GetColumnIndexLocation();
+    if (!column_index_location.has_value()) {
+      return ::arrow::Status::Invalid("Column index does not exist for column 
{}", i);
+    }
+
+    if (column_index_buffer_ == nullptr) {
+      ARROW_ASSIGN_OR_RAISE(
+          column_index_buffer_,
+          input_->ReadAt(column_index_base_offset_, column_index_size_));
+    }
+
+    auto buffer = column_index_buffer_.get();
+    int64_t buffer_offset = column_index_location->offset - 
column_index_base_offset_;
+    uint32_t length = column_index_location->length;
+    DCHECK_GE(buffer_offset, 0);
+    DCHECK_LE(buffer_offset + length, column_index_size_);
+
+    auto descr = row_group_metadata_->schema()->Column(i);
+    std::shared_ptr<ColumnIndex> column_index;
+    try {
+      column_index =
+          ColumnIndex::Make(*descr, buffer->data() + buffer_offset, length, 
properties_);
+    } catch (...) {
+      return ::arrow::Status::SerializationError(
+          "Cannot deserialize column index for column {}", i);
+    }
+    return column_index;
+  }
+
+  /// Read offset index of a column chunk.
+  ::arrow::Result<std::shared_ptr<OffsetIndex>> GetOffsetIndex(int32_t i) 
override {
+    if (i < 0 || i >= row_group_metadata_->num_columns()) {
+      return ::arrow::Status::IndexError("Invalid column {} to get offset 
index", i);
+    }
+
+    auto col_chunk = row_group_metadata_->ColumnChunk(i);
+
+    std::unique_ptr<ColumnCryptoMetaData> crypto_metadata = 
col_chunk->crypto_metadata();
+    if (crypto_metadata != nullptr && file_decryptor_ == nullptr) {
+      ParquetException::NYI("Cannot read encrypted offset index yet");
+    }
+
+    auto offset_index_location = col_chunk->GetOffsetIndexLocation();
+    if (!offset_index_location.has_value()) {
+      return ::arrow::Status::Invalid("Offset index does not exist for column 
{}", i);
+    }
+
+    if (offset_index_buffer_ == nullptr) {
+      ARROW_ASSIGN_OR_RAISE(
+          offset_index_buffer_,
+          input_->ReadAt(offset_index_base_offset_, offset_index_size_));
+    }
+
+    auto buffer = offset_index_buffer_.get();
+    int64_t buffer_offset = offset_index_location->offset - 
offset_index_base_offset_;
+    uint32_t length = offset_index_location->length;
+    DCHECK_GE(buffer_offset, 0);
+    DCHECK_LE(buffer_offset + length, offset_index_size_);
+
+    std::shared_ptr<OffsetIndex> offset_index;
+    try {
+      offset_index =
+          OffsetIndex::Make(buffer->data() + buffer_offset, length, 
properties_);
+    } catch (...) {
+      return ::arrow::Status::SerializationError(
+          "Cannot deserialize offset index for column {}", i);
+    }
+    return offset_index;
+  }
+
+ private:
+  /// The input stream that can perform random access read.
+  ::arrow::io::RandomAccessFile* input_;
+
+  /// The row group metadata to get column chunk metadata.
+  std::shared_ptr<RowGroupMetaData> row_group_metadata_;
+
+  /// Reader properties used to deserialize thrift object.
+  const ReaderProperties& properties_;
+
+  /// File-level decryptor.
+  std::shared_ptr<InternalFileDecryptor> file_decryptor_;
+
+  /// File offsets and sizes of the page Index.
+  int64_t column_index_base_offset_;
+  int64_t column_index_size_;
+  int64_t offset_index_base_offset_;
+  int64_t offset_index_size_;
+
+  /// Buffer to hold the raw bytes of the page index.
+  /// Will be set lazily when the corresponding page index is accessed for the 
1st time.
+  std::shared_ptr<::arrow::Buffer> column_index_buffer_;
+  std::shared_ptr<::arrow::Buffer> offset_index_buffer_;
+};
+
+class PageIndexReaderImpl : public PageIndexReader {
+ public:
+  PageIndexReaderImpl(::arrow::io::RandomAccessFile* input,
+                      std::shared_ptr<FileMetaData> file_metadata,
+                      const ReaderProperties& properties,
+                      std::shared_ptr<InternalFileDecryptor> file_decryptor)
+      : input_(input),
+        file_metadata_(std::move(file_metadata)),
+        properties_(properties),
+        file_decryptor_(std::move(file_decryptor)) {}
+
+  ::arrow::Result<std::shared_ptr<RowGroupPageIndexReader>> RowGroup(int i) 
override {
+    if (i < 0 || i >= file_metadata_->num_row_groups()) {
+      return ::arrow::Status::IndexError("Invalid row group ordinal {}", i);
+    }
+    return std::make_shared<RowGroupPageIndexReaderImpl>(
+        input_, file_metadata_->RowGroup(i), properties_, i, file_decryptor_);
+  }
+
+  void WillNeed(const std::vector<int32_t>& row_group_indices, bool 
need_column_index,
+                bool need_offset_index) override {
+    std::vector<::arrow::io::ReadRange> read_ranges;
+    for (int32_t row_group_ordinal : row_group_indices) {
+      int64_t column_index_offset;
+      int64_t column_index_size;
+      int64_t offset_index_offset;
+      int64_t offset_index_size;
+      bool has_column_index;
+      bool has_offset_index;
+      PageIndexReader::DeterminePageIndexRangesInRowGroup(
+          *file_metadata_->RowGroup(row_group_ordinal), &column_index_offset,
+          &column_index_size, &offset_index_offset, &offset_index_size, 
&has_column_index,
+          &has_offset_index);
+      if (need_column_index && has_column_index) {
+        read_ranges.emplace_back(
+            ::arrow::io::ReadRange{column_index_offset, column_index_size});
+      }
+      if (need_offset_index && has_offset_index) {
+        read_ranges.emplace_back(
+            ::arrow::io::ReadRange{offset_index_offset, offset_index_size});
+      }
+    }
+    PARQUET_IGNORE_NOT_OK(input_->WillNeed(read_ranges));
+  }
+
+  void WillNotNeed(const std::vector<int32_t>& row_group_indices) override {
+    // No-op for now.
+  }
+
+ private:
+  /// The input stream that can perform random read.
+  ::arrow::io::RandomAccessFile* input_;
+
+  /// The file metadata to get row group metadata.
+  std::shared_ptr<FileMetaData> file_metadata_;
+
+  /// Reader properties used to deserialize thrift object.
+  const ReaderProperties& properties_;
+
+  /// File-level decrypter.
+  std::shared_ptr<InternalFileDecryptor> file_decryptor_;
+};
+
 }  // namespace
 
+void PageIndexReader::DeterminePageIndexRangesInRowGroup(
+    const RowGroupMetaData& row_group_metadata, int64_t* column_index_start,
+    int64_t* column_index_size, int64_t* offset_index_start, int64_t* 
offset_index_size,
+    bool* has_column_index, bool* has_offset_index) {
+  int64_t ci_start = std::numeric_limits<int64_t>::max();
+  int64_t oi_start = std::numeric_limits<int64_t>::max();
+  int64_t ci_end = -1;
+  int64_t oi_end = -1;
+  for (int i = 0; i < row_group_metadata.num_columns(); ++i) {

Review Comment:
   it feels like this optimization could be worth it, especially in the tail.  
I've observed parquet files with O(1000) of columns, it isn't clear to me if 
page indexes are generally enabled for all columns or not though.  I think if 
we provided this method, then we simply document behavior as throwing an 
exception if the request wasn't requested, but it seems fine todo as a 
follow-up if we want it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to