emkornfield commented on code in PR #14964:
URL: https://github.com/apache/arrow/pull/14964#discussion_r1083089551
##########
cpp/src/parquet/page_index.cc:
##########
@@ -184,8 +185,219 @@ class OffsetIndexImpl : public OffsetIndex {
std::vector<PageLocation> page_locations_;
};
+class RowGroupPageIndexReaderImpl : public RowGroupPageIndexReader {
+ public:
+ RowGroupPageIndexReaderImpl(::arrow::io::RandomAccessFile* input,
+ std::shared_ptr<RowGroupMetaData>
row_group_metadata,
+ const ReaderProperties& properties,
+ int32_t row_group_ordinal,
+ std::shared_ptr<InternalFileDecryptor>
file_decryptor)
+ : input_(input),
+ row_group_metadata_(std::move(row_group_metadata)),
+ properties_(properties),
+ file_decryptor_(std::move(file_decryptor)),
+ index_read_range_(
+
PageIndexReader::DeterminePageIndexRangesInRowGroup(*row_group_metadata_)) {}
+
+ /// Read column index of a column chunk.
+ std::shared_ptr<ColumnIndex> GetColumnIndex(int32_t i) override {
+ if (i < 0 || i >= row_group_metadata_->num_columns()) {
+ throw ParquetException("Invalid column {} to get column index", i);
+ }
+
+ auto col_chunk = row_group_metadata_->ColumnChunk(i);
+
+ std::unique_ptr<ColumnCryptoMetaData> crypto_metadata =
col_chunk->crypto_metadata();
+ if (crypto_metadata != nullptr && file_decryptor_ == nullptr) {
+ ParquetException::NYI("Cannot read encrypted column index yet");
+ }
+
+ auto column_index_location = col_chunk->GetColumnIndexLocation();
+ if (!column_index_location.has_value()) {
+ return nullptr;
+ }
+
+ if (!index_read_range_.column_index.has_value()) {
+ throw ParquetException("Missing column index read range");
+ }
+
+ if (column_index_buffer_ == nullptr) {
+ PARQUET_ASSIGN_OR_THROW(column_index_buffer_,
+
input_->ReadAt(index_read_range_.column_index->offset,
+
index_read_range_.column_index->length));
+ }
+
+ auto buffer = column_index_buffer_.get();
+ int64_t buffer_offset =
+ column_index_location->offset - index_read_range_.column_index->offset;
+ uint32_t length = static_cast<uint32_t>(column_index_location->length);
+ DCHECK_GE(buffer_offset, 0);
+ DCHECK_LE(buffer_offset + length, index_read_range_.column_index->length);
+
+ auto descr = row_group_metadata_->schema()->Column(i);
+ std::shared_ptr<ColumnIndex> column_index;
+ try {
+ column_index =
+ ColumnIndex::Make(*descr, buffer->data() + buffer_offset, length,
properties_);
+ } catch (...) {
+ throw ParquetException("Cannot deserialize column index for column {}",
i);
+ }
+ return column_index;
+ }
+
+ /// Read offset index of a column chunk.
+ std::shared_ptr<OffsetIndex> GetOffsetIndex(int32_t i) override {
+ if (i < 0 || i >= row_group_metadata_->num_columns()) {
+ throw ParquetException("Invalid column {} to get offset index", i);
+ }
+
+ auto col_chunk = row_group_metadata_->ColumnChunk(i);
+
+ std::unique_ptr<ColumnCryptoMetaData> crypto_metadata =
col_chunk->crypto_metadata();
+ if (crypto_metadata != nullptr && file_decryptor_ == nullptr) {
+ ParquetException::NYI("Cannot read encrypted offset index yet");
+ }
+
+ auto offset_index_location = col_chunk->GetOffsetIndexLocation();
+ if (!offset_index_location.has_value()) {
+ return nullptr;
+ }
+
+ if (!index_read_range_.offset_index.has_value()) {
+ throw ParquetException("Missing column index read range");
+ }
+
+ if (offset_index_buffer_ == nullptr) {
+ PARQUET_ASSIGN_OR_THROW(offset_index_buffer_,
+
input_->ReadAt(index_read_range_.offset_index->offset,
+
index_read_range_.offset_index->length));
+ }
+
+ auto buffer = offset_index_buffer_.get();
+ int64_t buffer_offset =
+ offset_index_location->offset - index_read_range_.offset_index->offset;
+ uint32_t length = static_cast<uint32_t>(offset_index_location->length);
+ DCHECK_GE(buffer_offset, 0);
+ DCHECK_LE(buffer_offset + length, index_read_range_.offset_index->length);
+
+ std::shared_ptr<OffsetIndex> offset_index;
+ try {
+ offset_index =
+ OffsetIndex::Make(buffer->data() + buffer_offset, length,
properties_);
+ } catch (...) {
+ throw ParquetException("Cannot deserialize offset index for column {}",
i);
+ }
+ return offset_index;
+ }
+
+ private:
+ /// The input stream that can perform random access read.
+ ::arrow::io::RandomAccessFile* input_;
+
+ /// The row group metadata to get column chunk metadata.
+ std::shared_ptr<RowGroupMetaData> row_group_metadata_;
+
+ /// Reader properties used to deserialize thrift object.
+ const ReaderProperties& properties_;
+
+ /// File-level decryptor.
+ std::shared_ptr<InternalFileDecryptor> file_decryptor_;
+
+ /// File offsets and sizes of the page Index of all column chunks in the row
group.
+ RowGroupIndexReadRange index_read_range_;
+
+ /// Buffer to hold the raw bytes of the page index.
+ /// Will be set lazily when the corresponding page index is accessed for the
1st time.
+ std::shared_ptr<::arrow::Buffer> column_index_buffer_;
+ std::shared_ptr<::arrow::Buffer> offset_index_buffer_;
+};
+
+class PageIndexReaderImpl : public PageIndexReader {
+ public:
+ PageIndexReaderImpl(::arrow::io::RandomAccessFile* input,
+ std::shared_ptr<FileMetaData> file_metadata,
+ const ReaderProperties& properties,
+ std::shared_ptr<InternalFileDecryptor> file_decryptor)
+ : input_(input),
+ file_metadata_(std::move(file_metadata)),
+ properties_(properties),
+ file_decryptor_(std::move(file_decryptor)) {}
+
+ std::shared_ptr<RowGroupPageIndexReader> RowGroup(int i) override {
+ if (i < 0 || i >= file_metadata_->num_row_groups()) {
+ throw ParquetException("Invalid row group ordinal {}", i);
+ }
+ return std::make_shared<RowGroupPageIndexReaderImpl>(
+ input_, file_metadata_->RowGroup(i), properties_, i, file_decryptor_);
+ }
+
+ void WillNeed(const std::vector<int32_t>& row_group_indices,
+ IndexSelection index_selection) override {
+ std::vector<::arrow::io::ReadRange> read_ranges;
+ for (int32_t row_group_ordinal : row_group_indices) {
+ auto read_range = PageIndexReader::DeterminePageIndexRangesInRowGroup(
+ *file_metadata_->RowGroup(row_group_ordinal));
+ if (index_selection.column_index && read_range.column_index.has_value())
{
+ read_ranges.push_back(*read_range.column_index);
+ }
+ if (index_selection.offset_index && read_range.offset_index.has_value())
{
+ read_ranges.push_back(*read_range.offset_index);
+ }
+ }
+ PARQUET_IGNORE_NOT_OK(input_->WillNeed(read_ranges));
Review Comment:
could you add a comment on why this is being ignored.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]